[device/celestica]: Update new platform APIs (#3415)

* [device/celestica]: Update fan and psu apis

* [device/celestica]: Update sfp apis
This commit is contained in:
Wirut Getbamrung 2019-09-07 04:58:12 +07:00 committed by lguohan
parent 309af83225
commit 150ee07dd2
8 changed files with 1380 additions and 741 deletions

View File

@ -26,7 +26,8 @@ try:
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
NUM_FAN = 3
NUM_FAN_TRAY = 3
NUM_FAN = 1
NUM_PSU = 2
NUM_THERMAL = 7
NUM_SFP = 52
@ -42,9 +43,10 @@ class Chassis(ChassisBase):
def __init__(self):
self.config_data = {}
for index in range(0, NUM_FAN):
fan = Fan(index)
self._fan_list.append(fan)
for fant_index in range(0, NUM_FAN_TRAY):
for fan_index in range(0, NUM_FAN):
fan = Fan(fant_index, fan_index)
self._fan_list.append(fan)
for index in range(0, NUM_PSU):
psu = Psu(index)
self._psu_list.append(psu)

View File

@ -17,35 +17,50 @@ try:
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
EMC2305_FAN_PATH = "/sys/bus/i2c/drivers/emc2305/"
EMC2305_PATH = "/sys/bus/i2c/drivers/emc2305/"
FAN_PATH = "/sys/devices/platform/e1031.smc/"
SYS_GPIO_DIR = "/sys/class/gpio"
EMC2305_MAX_PWM = 255
EMC2305_FAN_PWM = "pwm{}"
EMC2305_FAN_TARGET = "fan{}_target"
EMC2305_FAN_INPUT = "pwm{}"
FAN_NAME_LIST = ["FAN-1", "FAN-2", "FAN-3"]
PSU_FAN_MAX_RPM = 11000
PSU_HWMON_PATH = "/sys/bus/i2c/devices/i2c-{0}/{0}-00{1}/hwmon"
PSU_I2C_MAPPING = {
0: {
"num": 13,
"addr": "5b"
},
1: {
"num": 12,
"addr": "5a"
},
}
class Fan(FanBase):
"""Platform-specific Fan class"""
def __init__(self, fan_index):
self.index = fan_index
self.config_data = {}
self.fan_speed = 0
FanBase.__init__(self)
def __init__(self, fan_tray_index, fan_index=0, is_psu_fan=False, psu_index=0):
self.fan_index = fan_index
self.fan_tray_index = fan_tray_index
self.is_psu_fan = is_psu_fan
if self.is_psu_fan:
self.psu_index = psu_index
self.psu_i2c_num = PSU_I2C_MAPPING[self.psu_index]["num"]
self.psu_i2c_addr = PSU_I2C_MAPPING[self.psu_index]["addr"]
self.psu_hwmon_path = PSU_HWMON_PATH.format(
self.psu_i2c_num, self.psu_i2c_addr)
# e1031 fan attributes
# Single emc2305 chip located at i2c-23-4d
# to control a fan module
self.e1031_emc2305_chip = [
self.emc2305_chip_mapping = [
{
'device': "23-004d",
'index_map': [1, 2, 4]
}
]
self.fan_e1031_presence = "fan{}_prs"
self.fan_e1031_direction = "fan{}_dir"
self.fan_e1031_led = "fan{}_led"
@ -56,84 +71,103 @@ class Fan(FanBase):
}
FanBase.__init__(self)
def get_direction(self):
direction = self.FAN_DIRECTION_INTAKE
def __read_txt_file(self, file_path):
try:
fan_direction_file = (FAN_PATH +
self.fan_e1031_direction.format(self.index+1))
with open(fan_direction_file, 'r') as file:
raw = file.read().strip('\r\n')
if str(raw).upper() == "F2B":
direction = self.FAN_DIRECTION_INTAKE
else:
direction = self.FAN_DIRECTION_EXHAUST
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
pass
return ""
def __write_txt_file(self, file_path, value):
try:
with open(file_path, 'w') as fd:
fd.write(str(value))
except:
return False
return True
def __search_file_by_name(self, directory, file_name):
for dirpath, dirnames, files in os.walk(directory):
for name in files:
file_path = os.path.join(dirpath, name)
if name in file_name:
return file_path
return None
def get_direction(self):
"""
Retrieves the direction of fan
Returns:
A string, either FAN_DIRECTION_INTAKE or FAN_DIRECTION_EXHAUST
depending on fan direction
"""
direction = self.FAN_DIRECTION_EXHAUST
if not self.is_psu_fan:
fan_direction_file = (FAN_PATH +
self.fan_e1031_direction.format(self.fan_tray_index+1))
raw = self.__read_txt_file(fan_direction_file).strip('\r\n')
direction = self.FAN_DIRECTION_INTAKE if str(
raw).upper() == "F2B" else self.FAN_DIRECTION_EXHAUST
return direction
def get_speed(self):
"""
E1031 platform specific data:
Retrieves the speed of fan as a percentage of full speed
Returns:
An integer, the percentage of full fan speed, in the range 0 (off)
to 100 (full speed)
Note:
speed = pwm_in/255*100
"""
# TODO: Seperate PSU's fan and main fan class
if self.fan_speed != 0:
return self.fan_speed
else:
speed = 0
pwm = []
emc2305_chips = self.e1031_emc2305_chip
speed = 0
if self.is_psu_fan:
fan_speed_sysfs_name = "fan{}_input".format(self.fan_index+1)
fan_speed_sysfs_path = self.__search_file_by_name(
self.psu_hwmon_path, fan_speed_sysfs_name)
fan_speed_rpm = self.__read_txt_file(fan_speed_sysfs_path) or 0
fan_speed_raw = float(fan_speed_rpm)/PSU_FAN_MAX_RPM * 100
speed = math.ceil(float(fan_speed_rpm) * 100 / PSU_FAN_MAX_RPM)
elif self.get_presence():
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_PATH, device, EMC2305_FAN_INPUT)
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
raw = self.__read_txt_file(sysfs_path).strip('\r\n')
pwm = int(raw, 10) if raw else 0
speed = math.ceil(float(pwm * 100 / EMC2305_MAX_PWM))
for chip in emc2305_chips:
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_FAN_PATH, device, EMC2305_FAN_INPUT)
sysfs_path = sysfs_path.format(fan_index[self.index])
try:
with open(sysfs_path, 'r') as file:
raw = file.read().strip('\r\n')
pwm.append(int(raw, 10))
except IOError:
raise IOError("Unable to open " + sysfs_path)
speed = math.ceil(
float(pwm[0]) * 100 / EMC2305_MAX_PWM)
return int(speed)
return int(speed)
def get_target_speed(self):
"""
E1031 platform specific data:
Retrieves the target (expected) speed of the fan
Returns:
An integer, the percentage of full fan speed, in the range 0 (off)
to 100 (full speed)
Note:
speed_pc = pwm_target/255*100
0 : when PWM mode is use
pwm : when pwm mode is not use
"""
target = 0
pwm = []
emc2305_chips = self.e1031_emc2305_chip
for chip in emc2305_chips:
if not self.is_psu_fan:
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_FAN_PATH, device, EMC2305_FAN_TARGET)
sysfs_path = sysfs_path.format(fan_index[self.index])
try:
with open(sysfs_path, 'r') as file:
raw = file.read().strip('\r\n')
pwm.append(int(raw, 10))
except IOError:
raise IOError("Unable to open " + sysfs_path)
target = pwm[0] * 100 / EMC2305_MAX_PWM
EMC2305_PATH, device, EMC2305_FAN_TARGET)
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
raw = self.__read_txt_file(sysfs_path).strip('\r\n')
pwm = int(raw, 10) if raw else 0
target = math.ceil(float(pwm) * 100 / EMC2305_MAX_PWM)
return target
@ -148,40 +182,50 @@ class Fan(FanBase):
def set_speed(self, speed):
"""
Depends on pwm or target mode is selected:
Sets the fan speed
Args:
speed: An integer, the percentage of full fan speed to set fan to,
in the range 0 (off) to 100 (full speed)
Returns:
A boolean, True if speed is set successfully, False if not
Note:
Depends on pwm or target mode is selected:
1) pwm = speed_pc * 255 <-- Currently use this mode.
2) target_pwm = speed_pc * 100 / 255
2.1) set pwm{}_enable to 3
"""
pwm = speed * 255 / 100
emc2305_chips = self.e1031_emc2305_chip
for chip in emc2305_chips:
if not self.is_psu_fan and self.get_presence():
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_FAN_PATH, device, EMC2305_FAN_PWM)
sysfs_path = sysfs_path.format(fan_index[self.index])
try:
with open(sysfs_path, 'w') as file:
file.write(str(int(pwm)))
except IOError:
return False
EMC2305_PATH, device, EMC2305_FAN_PWM)
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
return self.__write_txt_file(sysfs_path, int(pwm))
return True
return False
def set_status_led(self, color):
try:
"""
Sets the state of the fan module status LED
Args:
color: A string representing the color with which to set the
fan module status LED
Returns:
bool: True if status LED state is set successfully, False if not
"""
set_status_led = False
if not self.is_psu_fan:
fan_led_file = (FAN_PATH +
self.fan_e1031_led.format(self.index+1))
with open(fan_led_file, 'r') as file:
file.write(self.fan_e1031_led_col_map[color])
except IOError:
return False
self.fan_e1031_led.format(self.fan_tray_index+1))
return True
set_status_led = self.__write_txt_file(
fan_led_file, self.fan_e1031_led_col_map[color]) if self.get_presence() else False
return set_status_led
def get_name(self):
"""
@ -189,7 +233,10 @@ class Fan(FanBase):
Returns:
string: The name of the device
"""
return FAN_NAME_LIST[self.index]
fan_name = FAN_NAME_LIST[self.fan_tray_index] if not self.is_psu_fan else "PSU-{} FAN-{}".format(
self.psu_index+1, self.fan_index+1)
return fan_name
def get_presence(self):
"""
@ -197,13 +244,8 @@ class Fan(FanBase):
Returns:
bool: True if PSU is present, False if not
"""
fan_direction_file = (FAN_PATH +
self.fan_e1031_presence.format(self.fan_tray_index+1))
present_str = self.__read_txt_file(fan_direction_file) or '1'
try:
fan_direction_file = (FAN_PATH +
self.fan_e1031_presence.format(self.index+1))
with open(fan_direction_file, 'r') as file:
present = int(file.read().strip('\r\n'))
except IOError:
return False
return present == 0
return int(present_str) == 0 if not self.is_psu_fan else True

View File

@ -18,8 +18,20 @@ except ImportError as e:
raise ImportError(str(e) + "- required module not found")
FAN_E1031_SPEED_PATH = "/sys/class/hwmon/hwmon{}/fan1_input"
HWMON_PATH = "/sys/bus/i2c/devices/i2c-{0}/{0}-00{1}/hwmon"
FAN_MAX_RPM = 11000
PSU_NAME_LIST = ["PSU-R", "PSU-L"]
PSU_NUM_FAN = [1, 1]
PSU_I2C_MAPPING = {
0: {
"num": 13,
"addr": "5b"
},
1: {
"num": 12,
"addr": "5a"
},
}
class Psu(PsuBase):
@ -31,26 +43,109 @@ class Psu(PsuBase):
self.psu_path = "/sys/devices/platform/e1031.smc/"
self.psu_presence = "psu{}_prs"
self.psu_oper_status = "psu{}_status"
self.i2c_num = PSU_I2C_MAPPING[self.index]["num"]
self.i2c_addr = PSU_I2C_MAPPING[self.index]["addr"]
self.hwmon_path = HWMON_PATH.format(self.i2c_num, self.i2c_addr)
for fan_index in range(0, PSU_NUM_FAN[self.index]):
fan = Fan(fan_index, 0, is_psu_fan=True, psu_index=self.index)
self._fan_list.append(fan)
PsuBase.__init__(self)
def get_fan(self):
"""
Retrieves object representing the fan module contained in this PSU
Returns:
An object dervied from FanBase representing the fan module
contained in this PSU
"""
fan_speed_path = FAN_E1031_SPEED_PATH.format(
str(self.index+3))
def __read_txt_file(self, file_path):
try:
with open(fan_speed_path) as fan_speed_file:
fan_speed_rpm = int(fan_speed_file.read())
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
fan_speed = 0
pass
return ""
fan_speed = float(fan_speed_rpm)/FAN_MAX_RPM * 100
fan = Fan(0)
fan.fan_speed = int(fan_speed) if int(fan_speed) <= 100 else 100
return fan
def __search_file_by_contain(self, directory, search_str, file_start):
for dirpath, dirnames, files in os.walk(directory):
for name in files:
file_path = os.path.join(dirpath, name)
if name.startswith(file_start) and search_str in self.__read_txt_file(file_path):
return file_path
return None
def get_voltage(self):
"""
Retrieves current PSU voltage output
Returns:
A float number, the output voltage in volts,
e.g. 12.1
"""
psu_voltage = 0.0
voltage_name = "in{}_input"
voltage_label = "vout1"
vout_label_path = self.__search_file_by_contain(
self.hwmon_path, voltage_label, "in")
if vout_label_path:
dir_name = os.path.dirname(vout_label_path)
basename = os.path.basename(vout_label_path)
in_num = filter(str.isdigit, basename)
vout_path = os.path.join(
dir_name, voltage_name.format(in_num))
vout_val = self.__read_txt_file(vout_path)
psu_voltage = float(vout_val) / 1000
return psu_voltage
def get_current(self):
"""
Retrieves present electric current supplied by PSU
Returns:
A float number, the electric current in amperes, e.g 15.4
"""
psu_current = 0.0
current_name = "curr{}_input"
current_label = "iout1"
curr_label_path = self.__search_file_by_contain(
self.hwmon_path, current_label, "cur")
if curr_label_path:
dir_name = os.path.dirname(curr_label_path)
basename = os.path.basename(curr_label_path)
cur_num = filter(str.isdigit, basename)
cur_path = os.path.join(
dir_name, current_name.format(cur_num))
cur_val = self.__read_txt_file(cur_path)
psu_current = float(cur_val) / 1000
return psu_current
def get_power(self):
"""
Retrieves current energy supplied by PSU
Returns:
A float number, the power in watts, e.g. 302.6
"""
psu_power = 0.0
current_name = "power{}_input"
current_label = "pout1"
pw_label_path = self.__search_file_by_contain(
self.hwmon_path, current_label, "power")
if pw_label_path:
dir_name = os.path.dirname(pw_label_path)
basename = os.path.basename(pw_label_path)
pw_num = filter(str.isdigit, basename)
pw_path = os.path.join(
dir_name, current_name.format(pw_num))
pw_val = self.__read_txt_file(pw_path)
psu_power = float(pw_val) / 1000000
return psu_power
def get_powergood_status(self):
"""
Retrieves the powergood status of PSU
Returns:
A boolean, True if PSU has stablized its output voltages and passed all
its internal self-tests, False if not.
"""
return self.get_status()
def set_status_led(self, color):
"""
@ -64,6 +159,15 @@ class Psu(PsuBase):
# Hardware not supported
return False
def get_status_led(self):
"""
Gets the state of the PSU status LED
Returns:
A string, one of the predefined STATUS_LED_COLOR_* strings above
"""
# Hardware not supported
return self.STATUS_LED_COLOR_OFF
def get_name(self):
"""
Retrieves the name of the device
@ -79,14 +183,10 @@ class Psu(PsuBase):
bool: True if PSU is present, False if not
"""
psu_location = ["R", "L"]
status = 0
try:
with open(self.psu_path + self.psu_presence.format(psu_location[self.index]), 'r') as psu_prs:
status = int(psu_prs.read())
except IOError:
return False
presences_status = self.__read_txt_file(
self.psu_path + self.psu_presence.format(psu_location[self.index])) or 0
return status == 1
return int(presences_status) == 1
def get_status(self):
"""
@ -95,11 +195,7 @@ class Psu(PsuBase):
A boolean value, True if device is operating properly, False if not
"""
psu_location = ["R", "L"]
status = 0
try:
with open(self.psu_path + self.psu_oper_status.format(psu_location[self.index]), 'r') as power_status:
status = int(power_status.read())
except IOError:
return False
power_status = self.__read_txt_file(
self.psu_path + self.psu_oper_status.format(psu_location[self.index])) or 0
return status == 1
return int(power_status) == 1

View File

@ -15,16 +15,65 @@ import sonic_device_util
from ctypes import create_string_buffer
try:
from swsssdk import ConfigDBConnector
from sonic_platform_base.sfp_base import SfpBase
from sonic_platform_base.sonic_sfp.sfputilbase import SfpUtilBase
from sonic_platform_base.sonic_sfp.sff8472 import sff8472Dom
from sonic_platform_base.sonic_sfp.sff8472 import sff8472InterfaceId
from sonic_platform_base.sonic_sfp.sff8472 import sffbase
from sonic_platform_base.sonic_sfp.sfputilhelper import SfpUtilHelper
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
INFO_OFFSET = 0
DOM_OFFSET = 256
class Sfp(SfpBase, SfpUtilBase):
XCVR_INTFACE_BULK_OFFSET = 0
XCVR_INTFACE_BULK_WIDTH_SFP = 21
XCVR_VENDOR_NAME_OFFSET = 20
XCVR_VENDOR_NAME_WIDTH = 16
XCVR_VENDOR_OUI_OFFSET = 37
XCVR_VENDOR_OUI_WIDTH = 3
XCVR_VENDOR_PN_OFFSET = 40
XCVR_VENDOR_PN_WIDTH = 16
XCVR_HW_REV_OFFSET = 56
XCVR_HW_REV_WIDTH_SFP = 4
XCVR_VENDOR_SN_OFFSET = 68
XCVR_VENDOR_SN_WIDTH = 16
XCVR_VENDOR_DATE_OFFSET = 84
XCVR_VENDOR_DATE_WIDTH = 8
XCVR_DOM_CAPABILITY_OFFSET = 92
XCVR_DOM_CAPABILITY_WIDTH = 1
# Offset for values in SFP eeprom
SFP_TEMPE_OFFSET = 96
SFP_TEMPE_WIDTH = 2
SFP_VOLT_OFFSET = 98
SFP_VOLT_WIDTH = 2
SFP_CHANNL_MON_OFFSET = 100
SFP_CHANNL_MON_WIDTH = 6
SFP_MODULE_THRESHOLD_OFFSET = 0
SFP_MODULE_THRESHOLD_WIDTH = 40
SFP_CHANNL_THRESHOLD_OFFSET = 112
SFP_CHANNL_THRESHOLD_WIDTH = 2
SFP_STATUS_CONTROL_OFFSET = 110
SFP_STATUS_CONTROL_WIDTH = 1
SFP_TX_DISABLE_HARD_BIT = 7
SFP_TX_DISABLE_SOFT_BIT = 6
sfp_cable_length_tup = ('LengthSMFkm-UnitsOfKm', 'LengthSMF(UnitsOf100m)',
'Length50um(UnitsOf10m)', 'Length62.5um(UnitsOfm)',
'LengthCable(UnitsOfm)', 'LengthOM3(UnitsOf10m)')
sfp_compliance_code_tup = ('10GEthernetComplianceCode', 'InfinibandComplianceCode',
'ESCONComplianceCodes', 'SONETComplianceCodes',
'EthernetComplianceCodes', 'FibreChannelLinkLength',
'FibreChannelTechnology', 'SFP+CableTechnology',
'FibreChannelTransmissionMedia', 'FibreChannelSpeed')
class Sfp(SfpBase):
"""Platform-specific Sfp class"""
# Port number
PORT_START = 1
PORT_END = 52
port_to_i2c_mapping = {
@ -33,54 +82,39 @@ class Sfp(SfpBase, SfpUtilBase):
51: 17,
52: 16
}
_sfp_port = range(49, PORT_END + 1)
PRS_PATH = "/sys/devices/platform/e1031.smc/SFP/sfp_modabs"
PLATFORM_ROOT_PATH = '/usr/share/sonic/device'
PMON_HWSKU_PATH = '/usr/share/sonic/hwsku'
SFP_STATUS_CONTROL_OFFSET = 110
SFP_STATUS_CONTROL_WIDTH = 1
_port_to_eeprom_mapping = {}
_sfp_port = range(49, PORT_END + 1)
SFP_EEPROM_TYPE_KEY = "TypeOfTransceiver"
SFP_EEPROM_HW_REV_KEY = "VendorRev"
SFP_EEPROM_MF_NAME_KEY = "VendorName"
SFP_EEPROM_MODEL_NAME_KEY = "VendorPN"
SFP_EEPROM_SERIAL_KEY = "VendorSN"
SFP_EEPROM_CONNECTOR_KEY = "Connector"
SFP_EEPROM_ENCODE_KEY = "EncodingCodes"
SFP_EEPROM_EXT_IDENT_KEY = "ExtIdentOfTypeOfTransceiver"
SFP_EEPROM_CABLE_KEY = "LengthCable(UnitsOfm)"
SFP_EEPROM_BIT_RATE_KEY = "NominalSignallingRate(UnitsOf100Mbd)"
SFP_EEPROM_SPEC_COM_KEY = "Specification compliance"
SFP_EEPROM_DATE_KEY = "VendorDataCode(YYYY-MM-DD Lot)"
SFP_EEPROM_OUI_KEY = "VendorOUI"
SFP_EEPROM_MON_DATA_KEY = "MonitorData"
SFP_EEPROM_TEMP_KEY = "Temperature"
SFP_EEPROM_VCC_KEY = "Vcc"
SFP_EEPROM_RX_PWR_KEY = "RXPower"
SFP_EEPROM_TX_PWR_KEY = "TXPower"
SFP_EEPROM_TX_BS_KEY = "TXBias"
SFP_EEPROM_STATUS_CON_KEY = "StatusControl"
PLATFORM = "x86_64-cel_e1031-r0"
HWSKU = "Celestica-E1031-T48S4"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
@property
def port_start(self):
return self.PORT_START
PLATFORM = "x86_64-cel_e1031-r0"
HWSKU = "Celestica-E1031-T48S4"
@property
def port_end(self):
return self.PORT_END
def __init__(self, sfp_index):
# Init index
self.index = sfp_index
self.port_num = self.index + 1
@property
def qsfp_ports(self):
return []
# Init eeprom path
eeprom_path = '/sys/bus/i2c/devices/i2c-{0}/{0}-0050/eeprom'
self.port_to_eeprom_mapping = {}
for x in range(self.PORT_START, self.PORT_END + 1):
if x not in self._sfp_port:
self.port_to_i2c_mapping[x] = None
self.port_to_eeprom_mapping[x] = eeprom_path.format(
self.port_to_i2c_mapping[x])
@property
def port_to_eeprom_mapping(self):
return self._port_to_eeprom_mapping
self.info_dict_keys = ['type', 'hardwarerev', 'serialnum', 'manufacturename', 'modelname', 'Connector', 'encoding', 'ext_identifier',
'ext_rateselect_compliance', 'cable_type', 'cable_length', 'nominal_bit_rate', 'specification_compliance', 'vendor_date', 'vendor_oui']
self.dom_dict_keys = ['rx_los', 'tx_fault', 'reset_status', 'power_lpmode', 'tx_disable', 'tx_disable_channel', 'temperature', 'voltage',
'rx1power', 'rx2power', 'rx3power', 'rx4power', 'tx1bias', 'tx2bias', 'tx3bias', 'tx4bias', 'tx1power', 'tx2power', 'tx3power', 'tx4power']
self.threshold_dict_keys = ['temphighalarm', 'temphighwarning', 'templowalarm', 'templowwarning', 'vcchighalarm', 'vcchighwarning', 'vcclowalarm', 'vcclowwarning', 'rxpowerhighalarm', 'rxpowerhighwarning',
'rxpowerlowalarm', 'rxpowerlowwarning', 'txpowerhighalarm', 'txpowerhighwarning', 'txpowerlowalarm', 'txpowerlowwarning', 'txbiashighalarm', 'txbiashighwarning', 'txbiaslowalarm', 'txbiaslowwarning']
SfpBase.__init__(self)
def _convert_string_to_num(self, value_str):
if "-inf" in value_str:
@ -102,49 +136,46 @@ class Sfp(SfpBase, SfpUtilBase):
else:
return 'N/A'
def get_low_power_mode(self, port_num):
raise NotImplementedError
def set_low_power_mode(self, port_num, lpmode):
raise NotImplementedError
def get_transceiver_change_event(self, timeout=0):
raise NotImplementedError
def __init__(self, sfp_index):
# Init SfpUtilBase
eeprom_path = '/sys/bus/i2c/devices/i2c-{0}/{0}-0050/eeprom'
for x in range(self.PORT_START, self.PORT_END + 1):
if x not in self._sfp_port:
self.port_to_i2c_mapping[x] = None
self.port_to_eeprom_mapping[x] = eeprom_path.format(
self.port_to_i2c_mapping[x])
self.read_porttab_mappings(self.__get_path_to_port_config_file())
SfpUtilBase.__init__(self)
# Init index
self.index = sfp_index
self.port_num = self.index + 1
def __read_txt_file(self, file_path):
try:
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
pass
return ""
def __is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
def __get_sysfsfile_eeprom(self):
sysfsfile_eeprom = None
sysfs_sfp_i2c_client_eeprom_path = self.port_to_eeprom_mapping[self.port_num]
try:
sysfsfile_eeprom = open(
sysfs_sfp_i2c_client_eeprom_path, mode="r+b", buffering=0)
except IOError:
pass
return sysfsfile_eeprom
def __get_path_to_port_config_file(self):
platform_path = "/".join([self.PLATFORM_ROOT_PATH, self.PLATFORM])
hwsku_path = "/".join([platform_path, self.HWSKU]
) if self.__is_host() else self.PMON_HWSKU_PATH
return "/".join([hwsku_path, "port_config.ini"])
def __read_eeprom_specific_bytes(self, offset, num_bytes):
sysfsfile_eeprom = None
eeprom_raw = []
for i in range(0, num_bytes):
eeprom_raw.append("0x00")
sysfs_sfp_i2c_client_eeprom_path = self.port_to_eeprom_mapping[self.port_num]
try:
sysfsfile_eeprom = open(
sysfs_sfp_i2c_client_eeprom_path, mode="rb", buffering=0)
sysfsfile_eeprom.seek(offset)
raw = sysfsfile_eeprom.read(num_bytes)
for n in range(0, num_bytes):
eeprom_raw[n] = hex(ord(raw[n]))[2:].zfill(2)
except:
pass
finally:
if sysfsfile_eeprom:
sysfsfile_eeprom.close()
return eeprom_raw
def get_transceiver_info(self):
"""
Retrieves transceiver info of this SFP
@ -169,44 +200,84 @@ class Sfp(SfpBase, SfpUtilBase):
vendor_oui |1*255VCHAR |vendor OUI
========================================================================
"""
transceiver_info_dict = dict()
# get eeprom data
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('interface'):
transceiver_info_data = self.eeprom_dict['interface'].get('data')
# check present status
sfpi_obj = sff8472InterfaceId()
if not self.get_presence() or not sfpi_obj:
return {}
# set specification_compliance
spec_com = transceiver_info_data.get(
self.SFP_EEPROM_SPEC_COM_KEY, {})
spec_com_str = "/".join(list(spec_com.values()))
offset = INFO_OFFSET
# set normal transceiver info
transceiver_info_dict['type'] = transceiver_info_data.get(
self.SFP_EEPROM_TYPE_KEY, 'N/A')
transceiver_info_dict['hardwarerev'] = transceiver_info_data.get(
self.SFP_EEPROM_HW_REV_KEY, 'N/A')
transceiver_info_dict['manufacturename'] = transceiver_info_data.get(
self.SFP_EEPROM_MF_NAME_KEY, 'N/A')
transceiver_info_dict['modelname'] = transceiver_info_data.get(
self.SFP_EEPROM_MODEL_NAME_KEY, 'N/A')
transceiver_info_dict['serialnum'] = transceiver_info_data.get(
self.SFP_EEPROM_SERIAL_KEY, 'N/A')
transceiver_info_dict['Connector'] = transceiver_info_data.get(
self.SFP_EEPROM_CONNECTOR_KEY, 'N/A')
transceiver_info_dict['encoding'] = transceiver_info_data.get(
self.SFP_EEPROM_ENCODE_KEY, 'N/A')
transceiver_info_dict['ext_identifier'] = transceiver_info_data.get(
self.SFP_EEPROM_EXT_IDENT_KEY, 'N/A')
transceiver_info_dict['cable_length'] = transceiver_info_data.get(
self.SFP_EEPROM_CABLE_KEY, 'N/A')
transceiver_info_dict['nominal_bit_rate'] = transceiver_info_data.get(
self.SFP_EEPROM_BIT_RATE_KEY, 'N/A')
transceiver_info_dict['vendor_date'] = transceiver_info_data.get(
self.SFP_EEPROM_DATE_KEY, 'N/A')
transceiver_info_dict['vendor_oui'] = transceiver_info_data.get(
self.SFP_EEPROM_OUI_KEY, 'N/A')
transceiver_info_dict['ext_rateselect_compliance'] = "N/A"
transceiver_info_dict['specification_compliance'] = spec_com_str or "N/A"
sfp_interface_bulk_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_INTFACE_BULK_OFFSET), XCVR_INTFACE_BULK_WIDTH_SFP)
sfp_interface_bulk_data = sfpi_obj.parse_sfp_info_bulk(
sfp_interface_bulk_raw, 0)
sfp_vendor_name_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_NAME_OFFSET), XCVR_VENDOR_NAME_WIDTH)
sfp_vendor_name_data = sfpi_obj.parse_vendor_name(
sfp_vendor_name_raw, 0)
sfp_vendor_pn_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_PN_OFFSET), XCVR_VENDOR_PN_WIDTH)
sfp_vendor_pn_data = sfpi_obj.parse_vendor_pn(
sfp_vendor_pn_raw, 0)
sfp_vendor_rev_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_HW_REV_OFFSET), XCVR_HW_REV_WIDTH_SFP)
sfp_vendor_rev_data = sfpi_obj.parse_vendor_rev(
sfp_vendor_rev_raw, 0)
sfp_vendor_sn_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_SN_OFFSET), XCVR_VENDOR_SN_WIDTH)
sfp_vendor_sn_data = sfpi_obj.parse_vendor_sn(
sfp_vendor_sn_raw, 0)
sfp_vendor_oui_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_OUI_OFFSET), XCVR_VENDOR_OUI_WIDTH)
if sfp_vendor_oui_raw is not None:
sfp_vendor_oui_data = sfpi_obj.parse_vendor_oui(
sfp_vendor_oui_raw, 0)
sfp_vendor_date_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_DATE_OFFSET), XCVR_VENDOR_DATE_WIDTH)
sfp_vendor_date_data = sfpi_obj.parse_vendor_date(
sfp_vendor_date_raw, 0)
transceiver_info_dict = dict.fromkeys(self.info_dict_keys, 'N/A')
compliance_code_dict = dict()
if sfp_interface_bulk_data:
transceiver_info_dict['type'] = sfp_interface_bulk_data['data']['type']['value']
transceiver_info_dict['Connector'] = sfp_interface_bulk_data['data']['Connector']['value']
transceiver_info_dict['encoding'] = sfp_interface_bulk_data['data']['EncodingCodes']['value']
transceiver_info_dict['ext_identifier'] = sfp_interface_bulk_data['data']['Extended Identifier']['value']
transceiver_info_dict['ext_rateselect_compliance'] = sfp_interface_bulk_data['data']['RateIdentifier']['value']
transceiver_info_dict['type_abbrv_name'] = sfp_interface_bulk_data['data']['type_abbrv_name']['value']
transceiver_info_dict['manufacturename'] = sfp_vendor_name_data[
'data']['Vendor Name']['value'] if sfp_vendor_name_data else 'N/A'
transceiver_info_dict['modelname'] = sfp_vendor_pn_data['data']['Vendor PN']['value'] if sfp_vendor_pn_data else 'N/A'
transceiver_info_dict['hardwarerev'] = sfp_vendor_rev_data['data']['Vendor Rev']['value'] if sfp_vendor_rev_data else 'N/A'
transceiver_info_dict['serialnum'] = sfp_vendor_sn_data['data']['Vendor SN']['value'] if sfp_vendor_sn_data else 'N/A'
transceiver_info_dict['vendor_oui'] = sfp_vendor_oui_data['data']['Vendor OUI']['value'] if sfp_vendor_oui_data else 'N/A'
transceiver_info_dict['vendor_date'] = sfp_vendor_date_data[
'data']['VendorDataCode(YYYY-MM-DD Lot)']['value'] if sfp_vendor_date_data else 'N/A'
transceiver_info_dict['cable_type'] = "Unknown"
transceiver_info_dict['cable_length'] = "Unknown"
for key in sfp_cable_length_tup:
if key in sfp_interface_bulk_data['data']:
transceiver_info_dict['cable_type'] = key
transceiver_info_dict['cable_length'] = str(
sfp_interface_bulk_data['data'][key]['value'])
for key in sfp_compliance_code_tup:
if key in sfp_interface_bulk_data['data']['Specification compliance']['value']:
compliance_code_dict[key] = sfp_interface_bulk_data['data']['Specification compliance']['value'][key]['value']
transceiver_info_dict['specification_compliance'] = str(
compliance_code_dict)
transceiver_info_dict['nominal_bit_rate'] = str(
sfp_interface_bulk_data['data']['NominalSignallingRate(UnitsOf100Mbd)']['value'])
return transceiver_info_dict
@ -218,60 +289,146 @@ class Sfp(SfpBase, SfpUtilBase):
========================================================================
keys |Value Format |Information
---------------------------|---------------|----------------------------
RX LOS |BOOLEAN |RX lost-of-signal status,
| |True if has RX los, False if not.
TX FAULT |BOOLEAN |TX fault status,
| |True if has TX fault, False if not.
Reset status |BOOLEAN |reset status,
| |True if SFP in reset, False if not.
LP mode |BOOLEAN |low power mode status,
| |True in lp mode, False if not.
TX disable |BOOLEAN |TX disable status,
| |True TX disabled, False if not.
TX disabled channel |HEX |disabled TX channles in hex,
| |bits 0 to 3 represent channel 0
rx_los |BOOLEAN |RX loss-of-signal status, True if has RX los, False if not.
tx_fault |BOOLEAN |TX fault status, True if has TX fault, False if not.
reset_status |BOOLEAN |reset status, True if SFP in reset, False if not.
lp_mode |BOOLEAN |low power mode status, True in lp mode, False if not.
tx_disable |BOOLEAN |TX disable status, True TX disabled, False if not.
tx_disabled_channel |HEX |disabled TX channels in hex, bits 0 to 3 represent channel 0
| |to channel 3.
Temperature |INT |module temperature in Celsius
Voltage |INT |supply voltage in mV
TX bias |INT |TX Bias Current in mA
RX power |INT |received optical power in mW
TX power |INT |TX output power in mW
temperature |INT |module temperature in Celsius
voltage |INT |supply voltage in mV
tx<n>bias |INT |TX Bias Current in mA, n is the channel number,
| |for example, tx2bias stands for tx bias of channel 2.
rx<n>power |INT |received optical power in mW, n is the channel number,
| |for example, rx2power stands for rx power of channel 2.
tx<n>power |INT |TX output power in mW, n is the channel number,
| |for example, tx2power stands for tx power of channel 2.
========================================================================
"""
transceiver_bulk_status_dict = dict()
# get eeprom data
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('dom'):
transceiver_dom_data = self.eeprom_dict['dom'].get('data', {})
transceiver_dom_data_mmv = transceiver_dom_data.get(
self.SFP_EEPROM_MON_DATA_KEY)
# check present status
sfpd_obj = sff8472Dom()
if not self.get_presence() or not sfpd_obj:
return {}
# set normal transceiver bulk status
transceiver_bulk_status_dict['temperature'] = transceiver_dom_data_mmv.get(
self.SFP_EEPROM_TEMP_KEY, 'N/A')
transceiver_bulk_status_dict['voltage'] = transceiver_dom_data_mmv.get(
self.SFP_EEPROM_VCC_KEY, 'N/A')
transceiver_bulk_status_dict['rx1power'] = transceiver_dom_data_mmv.get(
self.SFP_EEPROM_RX_PWR_KEY, 'N/A')
transceiver_bulk_status_dict['rx2power'] = "N/A"
transceiver_bulk_status_dict['rx3power'] = "N/A"
transceiver_bulk_status_dict['rx4power'] = "N/A"
transceiver_bulk_status_dict['tx1bias'] = transceiver_dom_data_mmv.get(
self.SFP_EEPROM_TX_BS_KEY, 'N/A')
transceiver_bulk_status_dict['tx2bias'] = "N/A"
transceiver_bulk_status_dict['tx3bias'] = "N/A"
transceiver_bulk_status_dict['tx4bias'] = "N/A"
transceiver_bulk_status_dict['tx1power'] = transceiver_dom_data_mmv.get(
self.SFP_EEPROM_TX_PWR_KEY, 'N/A')
transceiver_bulk_status_dict['tx2power'] = "N/A"
transceiver_bulk_status_dict['tx3power'] = "N/A"
transceiver_bulk_status_dict['tx4power'] = "N/A"
eeprom_ifraw = self.__read_eeprom_specific_bytes(0, DOM_OFFSET)
sfpi_obj = sff8472InterfaceId(eeprom_ifraw)
cal_type = sfpi_obj.get_calibration_type()
sfpd_obj._calibration_type = cal_type
for key in transceiver_bulk_status_dict:
transceiver_bulk_status_dict[key] = self._convert_string_to_num(
transceiver_bulk_status_dict[key])
offset = DOM_OFFSET
transceiver_dom_info_dict = dict.fromkeys(self.dom_dict_keys, 'N/A')
dom_temperature_raw = self.__read_eeprom_specific_bytes(
(offset + SFP_TEMPE_OFFSET), SFP_TEMPE_WIDTH)
return transceiver_bulk_status_dict
if dom_temperature_raw is not None:
dom_temperature_data = sfpd_obj.parse_temperature(
dom_temperature_raw, 0)
transceiver_dom_info_dict['temperature'] = dom_temperature_data['data']['Temperature']['value']
dom_voltage_raw = self.__read_eeprom_specific_bytes(
(offset + SFP_VOLT_OFFSET), SFP_VOLT_WIDTH)
if dom_voltage_raw is not None:
dom_voltage_data = sfpd_obj.parse_voltage(dom_voltage_raw, 0)
transceiver_dom_info_dict['voltage'] = dom_voltage_data['data']['Vcc']['value']
dom_channel_monitor_raw = self.__read_eeprom_specific_bytes(
(offset + SFP_CHANNL_MON_OFFSET), SFP_CHANNL_MON_WIDTH)
if dom_channel_monitor_raw is not None:
dom_voltage_data = sfpd_obj.parse_channel_monitor_params(
dom_channel_monitor_raw, 0)
transceiver_dom_info_dict['tx1power'] = dom_voltage_data['data']['TXPower']['value']
transceiver_dom_info_dict['rx1power'] = dom_voltage_data['data']['RXPower']['value']
transceiver_dom_info_dict['tx1bias'] = dom_voltage_data['data']['TXBias']['value']
for key in transceiver_dom_info_dict:
transceiver_dom_info_dict[key] = self._convert_string_to_num(
transceiver_dom_info_dict[key])
transceiver_dom_info_dict['rx_los'] = self.get_rx_los()
transceiver_dom_info_dict['tx_fault'] = self.get_tx_fault()
transceiver_dom_info_dict['reset_status'] = self.get_reset_status()
transceiver_dom_info_dict['lp_mode'] = self.get_lpmode()
return transceiver_dom_info_dict
def get_transceiver_threshold_info(self):
"""
Retrieves transceiver threshold info of this SFP
Returns:
A dict which contains following keys/values :
========================================================================
keys |Value Format |Information
---------------------------|---------------|----------------------------
temphighalarm |FLOAT |High Alarm Threshold value of temperature in Celsius.
templowalarm |FLOAT |Low Alarm Threshold value of temperature in Celsius.
temphighwarning |FLOAT |High Warning Threshold value of temperature in Celsius.
templowwarning |FLOAT |Low Warning Threshold value of temperature in Celsius.
vcchighalarm |FLOAT |High Alarm Threshold value of supply voltage in mV.
vcclowalarm |FLOAT |Low Alarm Threshold value of supply voltage in mV.
vcchighwarning |FLOAT |High Warning Threshold value of supply voltage in mV.
vcclowwarning |FLOAT |Low Warning Threshold value of supply voltage in mV.
rxpowerhighalarm |FLOAT |High Alarm Threshold value of received power in dBm.
rxpowerlowalarm |FLOAT |Low Alarm Threshold value of received power in dBm.
rxpowerhighwarning |FLOAT |High Warning Threshold value of received power in dBm.
rxpowerlowwarning |FLOAT |Low Warning Threshold value of received power in dBm.
txpowerhighalarm |FLOAT |High Alarm Threshold value of transmit power in dBm.
txpowerlowalarm |FLOAT |Low Alarm Threshold value of transmit power in dBm.
txpowerhighwarning |FLOAT |High Warning Threshold value of transmit power in dBm.
txpowerlowwarning |FLOAT |Low Warning Threshold value of transmit power in dBm.
txbiashighalarm |FLOAT |High Alarm Threshold value of tx Bias Current in mA.
txbiaslowalarm |FLOAT |Low Alarm Threshold value of tx Bias Current in mA.
txbiashighwarning |FLOAT |High Warning Threshold value of tx Bias Current in mA.
txbiaslowwarning |FLOAT |Low Warning Threshold value of tx Bias Current in mA.
========================================================================
"""
# check present status
sfpd_obj = sff8472Dom()
if not self.get_presence() and not sfpd_obj:
return {}
eeprom_ifraw = self.__read_eeprom_specific_bytes(0, DOM_OFFSET)
sfpi_obj = sff8472InterfaceId(eeprom_ifraw)
cal_type = sfpi_obj.get_calibration_type()
sfpd_obj._calibration_type = cal_type
offset = DOM_OFFSET
transceiver_dom_threshold_info_dict = dict.fromkeys(
self.threshold_dict_keys, 'N/A')
dom_module_threshold_raw = self.__read_eeprom_specific_bytes(
(offset + SFP_MODULE_THRESHOLD_OFFSET), SFP_MODULE_THRESHOLD_WIDTH)
if dom_module_threshold_raw is not None:
dom_module_threshold_data = sfpd_obj.parse_alarm_warning_threshold(
dom_module_threshold_raw, 0)
transceiver_dom_threshold_info_dict['temphighalarm'] = dom_module_threshold_data['data']['TempHighAlarm']['value']
transceiver_dom_threshold_info_dict['templowalarm'] = dom_module_threshold_data['data']['TempLowAlarm']['value']
transceiver_dom_threshold_info_dict['temphighwarning'] = dom_module_threshold_data['data']['TempHighWarning']['value']
transceiver_dom_threshold_info_dict['templowwarning'] = dom_module_threshold_data['data']['TempLowWarning']['value']
transceiver_dom_threshold_info_dict['vcchighalarm'] = dom_module_threshold_data['data']['VoltageHighAlarm']['value']
transceiver_dom_threshold_info_dict['vcclowalarm'] = dom_module_threshold_data['data']['VoltageLowAlarm']['value']
transceiver_dom_threshold_info_dict['vcchighwarning'] = dom_module_threshold_data[
'data']['VoltageHighWarning']['value']
transceiver_dom_threshold_info_dict['vcclowwarning'] = dom_module_threshold_data['data']['VoltageLowWarning']['value']
transceiver_dom_threshold_info_dict['txbiashighalarm'] = dom_module_threshold_data['data']['BiasHighAlarm']['value']
transceiver_dom_threshold_info_dict['txbiaslowalarm'] = dom_module_threshold_data['data']['BiasLowAlarm']['value']
transceiver_dom_threshold_info_dict['txbiashighwarning'] = dom_module_threshold_data['data']['BiasHighWarning']['value']
transceiver_dom_threshold_info_dict['txbiaslowwarning'] = dom_module_threshold_data['data']['BiasLowWarning']['value']
transceiver_dom_threshold_info_dict['txpowerhighalarm'] = dom_module_threshold_data['data']['TXPowerHighAlarm']['value']
transceiver_dom_threshold_info_dict['txpowerlowalarm'] = dom_module_threshold_data['data']['TXPowerLowAlarm']['value']
transceiver_dom_threshold_info_dict['txpowerhighwarning'] = dom_module_threshold_data['data']['TXPowerHighWarning']['value']
transceiver_dom_threshold_info_dict['txpowerlowwarning'] = dom_module_threshold_data['data']['TXPowerLowWarning']['value']
transceiver_dom_threshold_info_dict['rxpowerhighalarm'] = dom_module_threshold_data['data']['RXPowerHighAlarm']['value']
transceiver_dom_threshold_info_dict['rxpowerlowalarm'] = dom_module_threshold_data['data']['RXPowerLowAlarm']['value']
transceiver_dom_threshold_info_dict['rxpowerhighwarning'] = dom_module_threshold_data['data']['RXPowerHighWarning']['value']
transceiver_dom_threshold_info_dict['rxpowerlowwarning'] = dom_module_threshold_data['data']['RXPowerLowWarning']['value']
for key in transceiver_dom_threshold_info_dict:
transceiver_dom_threshold_info_dict[key] = self._convert_string_to_num(
transceiver_dom_threshold_info_dict[key])
return transceiver_dom_threshold_info_dict
def get_reset_status(self):
"""
@ -280,7 +437,7 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if reset enabled, False if disabled
"""
# SFP doesn't support this feature
return NotImplementedError
return False
def get_rx_los(self):
"""
@ -290,14 +447,12 @@ class Sfp(SfpBase, SfpUtilBase):
Note : RX LOS status is latched until a call to get_rx_los or a reset.
"""
rx_los = False
rx_los_key = "RXLOSState"
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('dom'):
transceiver_dom_data = self.eeprom_dict['dom'].get('data', {})
transceiver_dom_data_sc = transceiver_dom_data.get(
self.SFP_EEPROM_STATUS_CON_KEY)
state = transceiver_dom_data_sc.get(rx_los_key)
rx_los = True if 'off' not in state.lower() else False
status_control_raw = self.__read_eeprom_specific_bytes(
SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
if status_control_raw:
data = int(status_control_raw[0], 16)
rx_los = (sffbase().test_bit(data, 1) != 0)
return rx_los
def get_tx_fault(self):
@ -308,14 +463,12 @@ class Sfp(SfpBase, SfpUtilBase):
Note : TX fault status is lached until a call to get_tx_fault or a reset.
"""
tx_fault = False
tx_fault_key = "TXFaultState"
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('dom'):
transceiver_dom_data = self.eeprom_dict['dom'].get('data', {})
transceiver_dom_data_sc = transceiver_dom_data.get(
self.SFP_EEPROM_STATUS_CON_KEY)
state = transceiver_dom_data_sc.get(tx_fault_key)
tx_fault = True if 'off' not in state.lower() else False
status_control_raw = self.__read_eeprom_specific_bytes(
SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
if status_control_raw:
data = int(status_control_raw[0], 16)
tx_fault = (sffbase().test_bit(data, 2) != 0)
return tx_fault
def get_tx_disable(self):
@ -325,14 +478,17 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if tx_disable is enabled, False if disabled
"""
tx_disable = False
tx_disable_key = "TXDisableState"
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('dom'):
transceiver_dom_data = self.eeprom_dict['dom'].get('data', {})
transceiver_dom_data_sc = transceiver_dom_data.get(
self.SFP_EEPROM_STATUS_CON_KEY)
state = transceiver_dom_data_sc.get(tx_disable_key)
tx_disable = True if 'off' not in state.lower() else False
tx_fault = False
status_control_raw = self.__read_eeprom_specific_bytes(
SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
if status_control_raw:
data = int(status_control_raw[0], 16)
tx_disable_hard = (sffbase().test_bit(
data, SFP_TX_DISABLE_HARD_BIT) != 0)
tx_disable_soft = (sffbase().test_bit(
data, SFP_TX_DISABLE_SOFT_BIT) != 0)
tx_disable = tx_disable_hard | tx_disable_soft
return tx_disable
def get_tx_disable_channel(self):
@ -345,7 +501,7 @@ class Sfp(SfpBase, SfpUtilBase):
and channel 2 have been disabled.
"""
# SFP doesn't support this feature
return NotImplementedError
return 0
def get_lpmode(self):
"""
@ -354,7 +510,7 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if lpmode is enabled, False if disabled
"""
# SFP doesn't support this feature
return self.get_low_power_mode(self.port_num)
return False
def get_power_override(self):
"""
@ -363,7 +519,7 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if power-override is enabled, False if disabled
"""
# SFP doesn't support this feature
return NotImplementedError
return False
def get_temperature(self):
"""
@ -426,7 +582,7 @@ class Sfp(SfpBase, SfpUtilBase):
A boolean, True if successful, False if not
"""
# SFP doesn't support this feature
return NotImplementedError
return False
def tx_disable(self, tx_disable):
"""
@ -437,26 +593,29 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
A boolean, True if tx_disable is set successfully, False if not
"""
sysfsfile_eeprom = self.__get_sysfsfile_eeprom()
status_control_raw = self._read_eeprom_specific_bytes(
sysfsfile_eeprom, self.SFP_STATUS_CONTROL_OFFSET, self.SFP_STATUS_CONTROL_WIDTH) if sysfsfile_eeprom else None
sysfs_sfp_i2c_client_eeprom_path = self.port_to_eeprom_mapping[self.port_num]
status_control_raw = self.__read_eeprom_specific_bytes(
SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
if status_control_raw is not None:
tx_disable_bit = 0x80 if tx_disable else 0x7f
# Set bit 6 for Soft TX Disable Select
# 01000000 = 64 and 10111111 = 191
tx_disable_bit = 64 if tx_disable else 191
status_control = int(status_control_raw[0], 16)
tx_disable_ctl = (status_control | tx_disable_bit) if tx_disable else (
status_control & tx_disable_bit)
try:
sysfsfile_eeprom = open(
sysfs_sfp_i2c_client_eeprom_path, mode="r+b", buffering=0)
buffer = create_string_buffer(1)
buffer[0] = chr(tx_disable_ctl)
# Write to eeprom
sysfsfile_eeprom.seek(self.SFP_STATUS_CONTROL_OFFSET)
sysfsfile_eeprom.seek(SFP_STATUS_CONTROL_OFFSET)
sysfsfile_eeprom.write(buffer[0])
except IOError as e:
print "Error: unable to open file: %s" % str(e)
except:
#print("Error: unable to open file: %s" % str(e))
return False
finally:
if sysfsfile_eeprom is not None:
if sysfsfile_eeprom:
sysfsfile_eeprom.close()
time.sleep(0.01)
return True
@ -474,7 +633,7 @@ class Sfp(SfpBase, SfpUtilBase):
A boolean, True if successful, False if not
"""
# SFP doesn't support this feature
return NotImplementedError
return False
def set_lpmode(self, lpmode):
"""
@ -485,7 +644,8 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
A boolean, True if lpmode is set successfully, False if not
"""
return self.set_low_power_mode(self.port_num, lpmode)
# SFP doesn't support this feature
return False
def set_power_override(self, power_override, power_set):
"""
@ -504,7 +664,8 @@ class Sfp(SfpBase, SfpUtilBase):
A boolean, True if power-override and power_set are set successfully,
False if not
"""
return NotImplementedError
# SFP doesn't support this feature
return False
def get_name(self):
"""
@ -512,7 +673,11 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
string: The name of the device
"""
return self.logical[self.index]
sfputil_helper = SfpUtilHelper()
sfputil_helper.read_porttab_mappings(
self.__get_path_to_port_config_file())
name = sfputil_helper.logical[self.index] or "Unknown"
return name
def get_presence(self):
"""

View File

@ -26,7 +26,8 @@ try:
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
NUM_FAN = 5
NUM_FAN_TRAY = 5
NUM_FAN = 2
NUM_PSU = 2
NUM_THERMAL = 5
NUM_SFP = 32
@ -44,9 +45,10 @@ class Chassis(ChassisBase):
def __init__(self):
self.config_data = {}
for index in range(0, NUM_FAN):
fan = Fan(index)
self._fan_list.append(fan)
for fant_index in range(0, NUM_FAN_TRAY):
for fan_index in range(0, NUM_FAN):
fan = Fan(fant_index, fan_index)
self._fan_list.append(fan)
for index in range(0, NUM_PSU):
psu = Psu(index)
self._psu_list.append(psu)

View File

@ -18,27 +18,46 @@ except ImportError as e:
raise ImportError(str(e) + "- required module not found")
EMC2305_PATH = "/sys/bus/i2c/drivers/emc2305/"
SYS_GPIO_DIR = "/sys/class/gpio"
GPIO_DIR = "/sys/class/gpio"
GPIO_LABEL = "pca9505"
EMC2305_MAX_PWM = 255
EMC2305_FAN_PWM = "pwm{}"
EMC2305_FAN_TARGET = "fan{}_target"
EMC2305_FAN_INPUT = "pwm{}"
FAN_NAME_LIST = ["FAN-1", "FAN-2", "FAN-3", "FAN-4", "FAN-5"]
FAN_NAME_LIST = ["FAN-1F", "FAN-1R", "FAN-2F", "FAN-2R",
"FAN-3F", "FAN-3R", "FAN-4F", "FAN-4R", "FAN-5F", "FAN-5R"]
PSU_FAN_MAX_RPM = 11000
PSU_HWMON_PATH = "/sys/bus/i2c/devices/i2c-{0}/{0}-00{1}/hwmon"
PSU_I2C_MAPPING = {
0: {
"num": 10,
"addr": "5a"
},
1: {
"num": 11,
"addr": "5b"
},
}
class Fan(FanBase):
"""Platform-specific Fan class"""
def __init__(self, fan_index):
self.index = fan_index
self.config_data = {}
self.fan_speed = 0
FanBase.__init__(self)
def __init__(self, fan_tray_index, fan_index=0, is_psu_fan=False, psu_index=0):
self.fan_index = fan_index
self.fan_tray_index = fan_tray_index
self.is_psu_fan = is_psu_fan
if self.is_psu_fan:
self.psu_index = psu_index
self.psu_i2c_num = PSU_I2C_MAPPING[self.psu_index]["num"]
self.psu_i2c_addr = PSU_I2C_MAPPING[self.psu_index]["addr"]
self.psu_hwmon_path = PSU_HWMON_PATH.format(
self.psu_i2c_num, self.psu_i2c_addr)
# dx010 fan attributes
# Two EMC2305s located at i2c-13-4d and i2c-13-2e
# to control a dual-fan module.
self.dx010_emc2305_chip = [
self.emc2305_chip_mapping = [
{
'device': "13-002e",
'index_map': [2, 1, 4, 5, 3]
@ -48,121 +67,133 @@ class Fan(FanBase):
'index_map': [2, 4, 5, 3, 1]
}
]
self.dx010_fan_gpio = [
{'base': self.get_gpio_base()},
{'prs': 10, 'dir': 15, 'color': {'red': 31, 'green': 32}},
{'prs': 11, 'dir': 16, 'color': {'red': 29, 'green': 30}},
{'prs': 12, 'dir': 17, 'color': {'red': 35, 'green': 36}},
{'prs': 13, 'dir': 18, 'color': {'red': 37, 'green': 38}},
{'prs': 14, 'dir': 19, 'color': {'red': 33, 'green': 34}},
{'base': self.__get_gpio_base()},
{'prs': 11, 'dir': 16, 'color': {'red': 31, 'green': 32}}, # 1
{'prs': 10, 'dir': 15, 'color': {'red': 29, 'green': 30}}, # 2
{'prs': 13, 'dir': 18, 'color': {'red': 35, 'green': 36}}, # 3
{'prs': 14, 'dir': 19, 'color': {'red': 37, 'green': 38}}, # 4
{'prs': 12, 'dir': 17, 'color': {'red': 33, 'green': 34}}, # 5
]
FanBase.__init__(self)
def get_gpio_base(self):
for r in os.listdir(SYS_GPIO_DIR):
if "gpiochip" in r:
def __read_txt_file(self, file_path):
try:
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
pass
return ""
def __write_txt_file(self, file_path, value):
try:
with open(file_path, 'w') as fd:
fd.write(str(value))
except:
return False
return True
def __search_file_by_name(self, directory, file_name):
for dirpath, dirnames, files in os.walk(directory):
for name in files:
file_path = os.path.join(dirpath, name)
if name in file_name:
return file_path
return None
def __get_gpio_base(self):
for r in os.listdir(GPIO_DIR):
label_path = os.path.join(GPIO_DIR, r, "label")
if "gpiochip" in r and GPIO_LABEL in self.__read_txt_file(label_path):
return int(r[8:], 10)
return 216 # Reserve
def get_gpio_value(self, pinnum):
def __get_gpio_value(self, pinnum):
gpio_base = self.dx010_fan_gpio[0]['base']
gpio_dir = SYS_GPIO_DIR + '/gpio' + str(gpio_base+pinnum)
gpio_dir = GPIO_DIR + '/gpio' + str(gpio_base+pinnum)
gpio_file = gpio_dir + "/value"
retval = self.__read_txt_file(gpio_file)
return retval.rstrip('\r\n')
try:
with open(gpio_file, 'r') as fd:
retval = fd.read()
except IOError:
raise IOError("Unable to open " + gpio_file + "file !")
retval = retval.rstrip('\r\n')
return retval
def set_gpio_value(self, pinnum, value=0):
def __set_gpio_value(self, pinnum, value=0):
gpio_base = self.dx010_fan_gpio[0]['base']
gpio_dir = SYS_GPIO_DIR + '/gpio' + str(gpio_base+pinnum)
gpio_dir = GPIO_DIR + '/gpio' + str(gpio_base+pinnum)
gpio_file = gpio_dir + "/value"
try:
with open(gpio_file, 'w') as fd:
retval = fd.write(str(value))
except IOError:
raise IOError("Unable to open " + gpio_file + "file !")
return self.__write_txt_file(gpio_file, value)
def get_direction(self):
"""
Retrieves the direction of fan
Returns:
A string, either FAN_DIRECTION_INTAKE or FAN_DIRECTION_EXHAUST
depending on fan direction
"""
direction = self.FAN_DIRECTION_EXHAUST
if not self.is_psu_fan:
raw = self.__get_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['dir'])
direction = self.FAN_DIRECTION_INTAKE
raw = self.get_gpio_value(self.dx010_fan_gpio[self.index+1]['dir'])
if int(raw, 10) == 0:
direction = self.FAN_DIRECTION_INTAKE
else:
direction = self.FAN_DIRECTION_EXHAUST
direction = self.FAN_DIRECTION_INTAKE if int(
raw, 10) == 0 else self.FAN_DIRECTION_EXHAUST
return direction
def get_speed(self):
"""
DX010 platform specific data:
Retrieves the speed of fan as a percentage of full speed
Returns:
An integer, the percentage of full fan speed, in the range 0 (off)
to 100 (full speed)
Note:
speed = pwm_in/255*100
"""
# TODO: Seperate PSU's fan and main fan class
if self.fan_speed != 0:
return self.fan_speed
else:
speed = 0
pwm = []
emc2305_chips = self.dx010_emc2305_chip
speed = 0
if self.is_psu_fan:
fan_speed_sysfs_name = "fan{}_input".format(self.fan_index+1)
fan_speed_sysfs_path = self.__search_file_by_name(
self.psu_hwmon_path, fan_speed_sysfs_name)
fan_speed_rpm = self.__read_txt_file(fan_speed_sysfs_path) or 0
fan_speed_raw = float(fan_speed_rpm)/PSU_FAN_MAX_RPM * 100
speed = math.ceil(float(fan_speed_rpm) * 100 / PSU_FAN_MAX_RPM)
elif self.get_presence():
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_PATH, device, EMC2305_FAN_INPUT)
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
raw = self.__read_txt_file(sysfs_path).strip('\r\n')
pwm = int(raw, 10) if raw else 0
speed = math.ceil(float(pwm * 100 / EMC2305_MAX_PWM))
for chip in emc2305_chips:
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_PATH, device, EMC2305_FAN_INPUT)
sysfs_path = sysfs_path.format(fan_index[self.index])
try:
with open(sysfs_path, 'r') as file:
raw = file.read().strip('\r\n')
pwm.append(int(raw, 10))
except IOError:
raise IOError("Unable to open " + sysfs_path)
speed = math.ceil(
float(pwm[0]) * 100 / EMC2305_MAX_PWM)
return int(speed)
return int(speed)
def get_target_speed(self):
"""
DX010 platform specific data:
Retrieves the target (expected) speed of the fan
Returns:
An integer, the percentage of full fan speed, in the range 0 (off)
to 100 (full speed)
Note:
speed_pc = pwm_target/255*100
0 : when PWM mode is use
pwm : when pwm mode is not use
"""
target = 0
pwm = []
emc2305_chips = self.dx010_emc2305_chip
for chip in emc2305_chips:
if not self.is_psu_fan:
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_PATH, device, EMC2305_FAN_TARGET)
sysfs_path = sysfs_path.format(fan_index[self.index])
try:
with open(sysfs_path, 'r') as file:
raw = file.read().strip('\r\n')
pwm.append(int(raw, 10))
except IOError:
raise IOError("Unable to open " + sysfs_path)
target = pwm[0] * 100 / EMC2305_MAX_PWM
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
raw = self.__read_txt_file(sysfs_path).strip('\r\n')
pwm = int(raw, 10) if raw else 0
target = math.ceil(float(pwm) * 100 / EMC2305_MAX_PWM)
return target
@ -177,55 +208,68 @@ class Fan(FanBase):
def set_speed(self, speed):
"""
Depends on pwm or target mode is selected:
Sets the fan speed
Args:
speed: An integer, the percentage of full fan speed to set fan to,
in the range 0 (off) to 100 (full speed)
Returns:
A boolean, True if speed is set successfully, False if not
Note:
Depends on pwm or target mode is selected:
1) pwm = speed_pc * 255 <-- Currently use this mode.
2) target_pwm = speed_pc * 100 / 255
2.1) set pwm{}_enable to 3
"""
pwm = speed * 255 / 100
emc2305_chips = self.dx010_emc2305_chip
for chip in emc2305_chips:
if not self.is_psu_fan and self.get_presence():
chip = self.emc2305_chip_mapping[self.fan_index]
device = chip['device']
fan_index = chip['index_map']
sysfs_path = "%s%s/%s" % (
EMC2305_PATH, device, EMC2305_FAN_PWM)
sysfs_path = sysfs_path.format(fan_index[self.index])
sysfs_path = sysfs_path.format(fan_index[self.fan_tray_index])
return self.__write_txt_file(sysfs_path, int(pwm))
return False
def set_status_led(self, color):
"""
Sets the state of the fan module status LED
Args:
color: A string representing the color with which to set the
fan module status LED
Returns:
bool: True if status LED state is set successfully, False if not
"""
set_status_led = False
if not self.is_psu_fan:
s1, s2 = False, False
try:
with open(sysfs_path, 'w') as file:
file.write(str(int(pwm)))
if color == self.STATUS_LED_COLOR_GREEN:
s1 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['red'], 1)
s2 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['green'], 0)
elif color == self.STATUS_LED_COLOR_RED:
s1 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['red'], 0)
s2 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['green'], 1)
elif color == self.STATUS_LED_COLOR_OFF:
s1 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['red'], 1)
s2 = self.__set_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['color']['green'], 1)
set_status_led = s1 and s2
return set_status_led
except IOError:
return False
return True
def set_status_led(self, color):
try:
if color == self.STATUS_LED_COLOR_GREEN:
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['red'], 1)
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['green'], 0)
elif color == self.STATUS_LED_COLOR_RED:
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['red'], 0)
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['green'], 1)
elif color == self.STATUS_LED_COLOR_OFF:
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['red'], 1)
self.set_gpio_value(
self.dx010_fan_gpio[self.index+1]['color']['green'], 1)
else:
return False
except IOError:
return False
return True
return set_status_led
def get_name(self):
"""
@ -233,7 +277,10 @@ class Fan(FanBase):
Returns:
string: The name of the device
"""
return FAN_NAME_LIST[self.index]
fan_name = FAN_NAME_LIST[self.fan_tray_index*2 + self.fan_index] if not self.is_psu_fan else "PSU-{} FAN-{}".format(
self.psu_index+1, self.fan_index+1)
return fan_name
def get_presence(self):
"""
@ -241,6 +288,7 @@ class Fan(FanBase):
Returns:
bool: True if PSU is present, False if not
"""
raw = self.get_gpio_value(self.dx010_fan_gpio[self.index+1]['prs'])
present_str = self.__get_gpio_value(
self.dx010_fan_gpio[self.fan_tray_index+1]['prs'])
return int(raw, 10) == 0
return int(present_str, 10) == 0 if not self.is_psu_fan else True

View File

@ -8,7 +8,7 @@
#
#############################################################################
import os.path
import os
import sonic_platform
try:
@ -17,66 +17,152 @@ try:
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
FAN_DX010_SPEED_PATH = "/sys/class/hwmon/hwmon{}/fan1_input"
GREEN_LED_PATH = "/sys/devices/platform/leds_dx010/leds/dx010:green:p-{}/brightness"
FAN_MAX_RPM = 11000
SYS_GPIO_DIR = "/sys/class/gpio"
HWMON_PATH = "/sys/bus/i2c/devices/i2c-{0}/{0}-00{1}/hwmon"
GPIO_DIR = "/sys/class/gpio"
GPIO_LABEL = "pca9505"
PSU_NAME_LIST = ["PSU-1", "PSU-2"]
PSU_NUM_FAN = [1, 1]
PSU_I2C_MAPPING = {
0: {
"num": 10,
"addr": "5a"
},
1: {
"num": 11,
"addr": "5b"
},
}
class Psu(PsuBase):
"""Platform-specific Psu class"""
def __init__(self, psu_index):
PsuBase.__init__(self)
self.index = psu_index
self.green_led_path = GREEN_LED_PATH.format(self.index+1)
self.dx010_psu_gpio = [
{'base': self.get_gpio_base()},
{'base': self.__get_gpio_base()},
{'prs': 27, 'status': 22},
{'prs': 28, 'status': 25}
]
self.i2c_num = PSU_I2C_MAPPING[self.index]["num"]
self.i2c_addr = PSU_I2C_MAPPING[self.index]["addr"]
self.hwmon_path = HWMON_PATH.format(self.i2c_num, self.i2c_addr)
for fan_index in range(0, PSU_NUM_FAN[self.index]):
fan = Fan(fan_index, 0, is_psu_fan=True, psu_index=self.index)
self._fan_list.append(fan)
PsuBase.__init__(self)
def get_gpio_base(self):
for r in os.listdir(SYS_GPIO_DIR):
if "gpiochip" in r:
def __read_txt_file(self, file_path):
try:
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
pass
return ""
def __search_file_by_contain(self, directory, search_str, file_start):
for dirpath, dirnames, files in os.walk(directory):
for name in files:
file_path = os.path.join(dirpath, name)
if name.startswith(file_start) and search_str in self.__read_txt_file(file_path):
return file_path
return None
def __get_gpio_base(self):
for r in os.listdir(GPIO_DIR):
label_path = os.path.join(GPIO_DIR, r, "label")
if "gpiochip" in r and GPIO_LABEL in self.__read_txt_file(label_path):
return int(r[8:], 10)
return 216 # Reserve
def get_gpio_value(self, pinnum):
def __get_gpio_value(self, pinnum):
gpio_base = self.dx010_psu_gpio[0]['base']
gpio_file = "{}/gpio{}/value".format(SYS_GPIO_DIR,
str(gpio_base+pinnum))
gpio_dir = GPIO_DIR + '/gpio' + str(gpio_base+pinnum)
gpio_file = gpio_dir + "/value"
retval = self.__read_txt_file(gpio_file)
return retval.rstrip('\r\n')
try:
with open(gpio_file, 'r') as fd:
retval = fd.read()
except IOError:
raise IOError("Unable to open " + gpio_file + "file !")
retval = retval.rstrip('\r\n')
return retval
def get_fan(self):
def get_voltage(self):
"""
Retrieves object representing the fan module contained in this PSU
Retrieves current PSU voltage output
Returns:
An object dervied from FanBase representing the fan module
contained in this PSU
A float number, the output voltage in volts,
e.g. 12.1
"""
psu_voltage = 0.0
voltage_name = "in{}_input"
voltage_label = "vout1"
fan_speed_path = FAN_DX010_SPEED_PATH.format(
str(self.index+8))
try:
with open(fan_speed_path) as fan_speed_file:
fan_speed_rpm = int(fan_speed_file.read())
except IOError:
fan_speed = 0
vout_label_path = self.__search_file_by_contain(
self.hwmon_path, voltage_label, "in")
if vout_label_path:
dir_name = os.path.dirname(vout_label_path)
basename = os.path.basename(vout_label_path)
in_num = filter(str.isdigit, basename)
vout_path = os.path.join(
dir_name, voltage_name.format(in_num))
vout_val = self.__read_txt_file(vout_path)
psu_voltage = float(vout_val) / 1000
fan_speed = float(fan_speed_rpm)/FAN_MAX_RPM * 100
fan = Fan(0)
fan.fan_speed = int(fan_speed) if int(fan_speed) <= 100 else 100
return fan
return psu_voltage
def get_current(self):
"""
Retrieves present electric current supplied by PSU
Returns:
A float number, the electric current in amperes, e.g 15.4
"""
psu_current = 0.0
current_name = "curr{}_input"
current_label = "iout1"
curr_label_path = self.__search_file_by_contain(
self.hwmon_path, current_label, "cur")
if curr_label_path:
dir_name = os.path.dirname(curr_label_path)
basename = os.path.basename(curr_label_path)
cur_num = filter(str.isdigit, basename)
cur_path = os.path.join(
dir_name, current_name.format(cur_num))
cur_val = self.__read_txt_file(cur_path)
psu_current = float(cur_val) / 1000
return psu_current
def get_power(self):
"""
Retrieves current energy supplied by PSU
Returns:
A float number, the power in watts, e.g. 302.6
"""
psu_power = 0.0
current_name = "power{}_input"
current_label = "pout1"
pw_label_path = self.__search_file_by_contain(
self.hwmon_path, current_label, "power")
if pw_label_path:
dir_name = os.path.dirname(pw_label_path)
basename = os.path.basename(pw_label_path)
pw_num = filter(str.isdigit, basename)
pw_path = os.path.join(
dir_name, current_name.format(pw_num))
pw_val = self.__read_txt_file(pw_path)
psu_power = float(pw_val) / 1000000
return psu_power
def get_powergood_status(self):
"""
Retrieves the powergood status of PSU
Returns:
A boolean, True if PSU has stablized its output voltages and passed all
its internal self-tests, False if not.
"""
return self.get_status()
def set_status_led(self, color):
"""
@ -104,6 +190,20 @@ class Psu(PsuBase):
return True
def get_status_led(self):
"""
Gets the state of the PSU status LED
Returns:
A string, one of the predefined STATUS_LED_COLOR_* strings above
"""
status = self.__read_txt_file(self.green_led_path)
status_str = {
'255': self.STATUS_LED_COLOR_GREEN,
'0': self.STATUS_LED_COLOR_OFF
}.get(status, None)
return status_str
def get_name(self):
"""
Retrieves the name of the device
@ -118,7 +218,7 @@ class Psu(PsuBase):
Returns:
bool: True if PSU is present, False if not
"""
raw = self.get_gpio_value(self.dx010_psu_gpio[self.index+1]['prs'])
raw = self.__get_gpio_value(self.dx010_psu_gpio[self.index+1]['prs'])
return int(raw, 10) == 0
def get_status(self):
@ -127,5 +227,6 @@ class Psu(PsuBase):
Returns:
A boolean value, True if device is operating properly, False if not
"""
raw = self.get_gpio_value(self.dx010_psu_gpio[self.index+1]['status'])
raw = self.__get_gpio_value(
self.dx010_psu_gpio[self.index+1]['status'])
return int(raw, 10) == 1

View File

@ -15,47 +15,76 @@ import sonic_device_util
from ctypes import create_string_buffer
try:
from swsssdk import ConfigDBConnector
from sonic_platform_base.sfp_base import SfpBase
from sonic_platform_base.sonic_sfp.sfputilbase import SfpUtilBase
from sonic_platform_base.sonic_sfp.sff8436 import sff8436Dom
from sonic_platform_base.sonic_sfp.sff8436 import sff8436InterfaceId
from sonic_platform_base.sonic_sfp.sfputilhelper import SfpUtilHelper
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
INFO_OFFSET = 128
DOM_OFFSET = 0
class Sfp(SfpBase, SfpUtilBase):
XCVR_INTFACE_BULK_OFFSET = 0
XCVR_INTFACE_BULK_WIDTH_QSFP = 20
XCVR_HW_REV_WIDTH_QSFP = 2
XCVR_CABLE_LENGTH_WIDTH_QSFP = 5
XCVR_VENDOR_NAME_OFFSET = 20
XCVR_VENDOR_NAME_WIDTH = 16
XCVR_VENDOR_OUI_OFFSET = 37
XCVR_VENDOR_OUI_WIDTH = 3
XCVR_VENDOR_PN_OFFSET = 40
XCVR_VENDOR_PN_WIDTH = 16
XCVR_HW_REV_OFFSET = 56
XCVR_HW_REV_WIDTH_OSFP = 2
XCVR_HW_REV_WIDTH_SFP = 4
XCVR_VENDOR_SN_OFFSET = 68
XCVR_VENDOR_SN_WIDTH = 16
XCVR_VENDOR_DATE_OFFSET = 84
XCVR_VENDOR_DATE_WIDTH = 8
XCVR_DOM_CAPABILITY_OFFSET = 92
XCVR_DOM_CAPABILITY_WIDTH = 1
# Offset for values in QSFP eeprom
QSFP_DOM_REV_OFFSET = 1
QSFP_DOM_REV_WIDTH = 1
QSFP_TEMPE_OFFSET = 22
QSFP_TEMPE_WIDTH = 2
QSFP_VOLT_OFFSET = 26
QSFP_VOLT_WIDTH = 2
QSFP_CHANNL_MON_OFFSET = 34
QSFP_CHANNL_MON_WIDTH = 16
QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH = 24
QSFP_CONTROL_OFFSET = 86
QSFP_CONTROL_WIDTH = 8
QSFP_CHANNL_RX_LOS_STATUS_OFFSET = 3
QSFP_CHANNL_RX_LOS_STATUS_WIDTH = 1
QSFP_CHANNL_TX_FAULT_STATUS_OFFSET = 4
QSFP_CHANNL_TX_FAULT_STATUS_WIDTH = 1
QSFP_POWEROVERRIDE_OFFSET = 93
QSFP_POWEROVERRIDE_WIDTH = 1
QSFP_MODULE_THRESHOLD_OFFSET = 128
QSFP_MODULE_THRESHOLD_WIDTH = 24
QSFP_CHANNEL_THRESHOLD_OFFSET = 176
QSFP_CHANNEL_THRESHOLD_WIDTH = 16
qsfp_cable_length_tup = ('Length(km)', 'Length OM3(2m)',
'Length OM2(m)', 'Length OM1(m)',
'Length Cable Assembly(m)')
qsfp_compliance_code_tup = ('10/40G Ethernet Compliance Code', 'SONET Compliance codes',
'SAS/SATA compliance codes', 'Gigabit Ethernet Compliant codes',
'Fibre Channel link length/Transmitter Technology',
'Fibre Channel transmission media', 'Fibre Channel Speed')
class Sfp(SfpBase):
"""Platform-specific Sfp class"""
# Port number
PORT_START = 1
PORT_END = 32
PORTS_IN_BLOCK = 32
# Offset for values in QSFP info eeprom
QSFP_CONTROL_OFFSET = 86
QSFP_CONTROL_WIDTH = 8
QSFP_CHANNL_RX_LOS_STATUS_OFFSET = 3
QSFP_CHANNL_RX_LOS_STATUS_WIDTH = 1
QSFP_CHANNL_TX_FAULT_STATUS_OFFSET = 4
QSFP_CHANNL_TX_FAULT_STATUS_WIDTH = 1
QSFP_POWEROVERRIDE_OFFSET = 93
QSFP_POWEROVERRIDE_WIDTH = 1
# Key for values in QSFP eeprom dict
QSFP_EEPROM_TYPE_KEY = "Identifier"
QSFP_EEPROM_HW_REV_KEY = "Vendor Rev"
QSFP_EEPROM_MF_NAME_KEY = "Vendor Name"
QSFP_EEPROM_MODEL_NAME_KEY = "Vendor PN"
QSFP_EEPROM_SERIAL_KEY = "Vendor SN"
QSFP_EEPROM_CONNECTOR_KEY = "Connector"
QSFP_EEPROM_ENCODE_KEY = "Encoding"
QSFP_EEPROM_EXT_IDENT_KEY = "Extended Identifier"
QSFP_EEPROM_EXT_RATE_KEY = "Extended RateSelect Compliance"
QSFP_EEPROM_CABLE_KEY = "Length(km)"
QSFP_EEPROM_BIT_RATE_KEY = "Nominal Bit Rate(100Mbs)"
QSFP_EEPROM_SPEC_COM_KEY = "Specification compliance"
QSFP_EEPROM_DATE_KEY = "Vendor Date Code(YYYY-MM-DD Lot)"
QSFP_EEPROM_OUI_KEY = "Vendor OUI"
# Path to QSFP sysfs
RESET_PATH = "/sys/devices/platform/dx010_cpld/qsfp_reset"
@ -63,27 +92,33 @@ class Sfp(SfpBase, SfpUtilBase):
PRS_PATH = "/sys/devices/platform/dx010_cpld/qsfp_modprs"
PLATFORM_ROOT_PATH = "/usr/share/sonic/device"
PMON_HWSKU_PATH = "/usr/share/sonic/hwsku"
PLATFORM = "x86_64-cel_seastone-r0"
HWSKU = "Seastone-DX010"
HOST_CHK_CMD = "docker > /dev/null 2>&1"
_port_to_eeprom_mapping = {}
PLATFORM = "x86_64-cel_seastone-r0"
HWSKU = "Seastone-DX010"
@property
def port_start(self):
return self.PORT_START
def __init__(self, sfp_index):
# Init index
self.index = sfp_index
self.port_num = self.index + 1 if self.PORT_START == 1 else index
@property
def port_end(self):
return self.PORT_END
# Init eeprom path
eeprom_path = '/sys/bus/i2c/devices/i2c-{0}/{0}-0050/eeprom'
self.port_to_eeprom_mapping = {}
for x in range(self.PORT_START, self.PORT_END + 1):
p_num = x - 1 if self.PORT_START == 1 else x
self.port_to_eeprom_mapping[x] = eeprom_path.format(p_num + 26)
@property
def qsfp_ports(self):
return range(self.PORT_START, self.PORTS_IN_BLOCK + 1)
self.info_dict_keys = ['type', 'hardwarerev', 'serialnum', 'manufacturename', 'modelname', 'Connector', 'encoding', 'ext_identifier',
'ext_rateselect_compliance', 'cable_type', 'cable_length', 'nominal_bit_rate', 'specification_compliance', 'vendor_date', 'vendor_oui']
@property
def port_to_eeprom_mapping(self):
return self._port_to_eeprom_mapping
self.dom_dict_keys = ['rx_los', 'tx_fault', 'reset_status', 'power_lpmode', 'tx_disable', 'tx_disable_channel', 'temperature', 'voltage',
'rx1power', 'rx2power', 'rx3power', 'rx4power', 'tx1bias', 'tx2bias', 'tx3bias', 'tx4bias', 'tx1power', 'tx2power', 'tx3power', 'tx4power']
self.threshold_dict_keys = ['temphighalarm', 'temphighwarning', 'templowalarm', 'templowwarning', 'vcchighalarm', 'vcchighwarning', 'vcclowalarm', 'vcclowwarning', 'rxpowerhighalarm', 'rxpowerhighwarning',
'rxpowerlowalarm', 'rxpowerlowwarning', 'txpowerhighalarm', 'txpowerhighwarning', 'txpowerlowalarm', 'txpowerlowwarning', 'txbiashighalarm', 'txbiashighwarning', 'txbiaslowalarm', 'txbiaslowwarning']
SfpBase.__init__(self)
def _convert_string_to_num(self, value_str):
if "-inf" in value_str:
@ -105,88 +140,14 @@ class Sfp(SfpBase, SfpUtilBase):
else:
return 'N/A'
def get_low_power_mode(self, port_num):
# Check for invalid port_num
if port_num < self.port_start or port_num > self.port_end:
return False
def __read_txt_file(self, file_path):
try:
reg_file = open(self.LP_PATH, "r")
content = reg_file.readline().rstrip()
except IOError as e:
print("Error: unable to open file: %s" % str(e))
return False
# content is a string containing the hex representation of the register
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = port_num - 1 if self.port_start == 1 else port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
# LPMode is active high
if reg_value & mask == 0:
return False
return True
def set_low_power_mode(self, port_num, lpmode):
try:
reg_file = open(self.LP_PATH, "r+")
except IOError as e:
print("Error: unable to open file: %s" % str(e))
return False
content = reg_file.readline().rstrip()
# content is a string containing the hex representation of the register
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = port_num - 1 if self.port_start == 1 else port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
# LPMode is active high; set or clear the bit accordingly
reg_value = reg_value | mask if lpmode else reg_value & ~mask
# Convert our register value back to a hex string and write back
content = hex(reg_value).strip('L')
reg_file.seek(0)
reg_file.write(content)
reg_file.close()
return True
def get_transceiver_change_event(self, timeout=0):
raise NotImplementedError
def __init__(self, sfp_index):
# Init SfpUtilBase
eeprom_path = '/sys/bus/i2c/devices/i2c-{0}/{0}-0050/eeprom'
for x in range(self.PORT_START, self.PORT_END + 1):
if self.port_start == 1:
self._port_to_eeprom_mapping[x] = eeprom_path.format(
(x - 1) + 26)
else:
self._port_to_eeprom_mapping[x] = eeprom_path.format(x + 26)
self.__read_porttab()
SfpUtilBase.__init__(self)
# Init index
self.index = sfp_index
self.port_num = self.index + 1
def __read_porttab(self):
try:
self.read_porttab_mappings(self.__get_path_to_port_config_file())
except:
with open(file_path, 'r') as fd:
data = fd.read()
return data.strip()
except IOError:
pass
return ""
def __is_host(self):
return os.system(self.HOST_CHK_CMD) == 0
@ -199,19 +160,24 @@ class Sfp(SfpBase, SfpUtilBase):
def __read_eeprom_specific_bytes(self, offset, num_bytes):
sysfsfile_eeprom = None
eeprom_raw = None
eeprom_raw = []
for i in range(0, num_bytes):
eeprom_raw.append("0x00")
sysfs_sfp_i2c_client_eeprom_path = self.port_to_eeprom_mapping[self.port_num]
try:
sysfsfile_eeprom = open(
sysfs_sfp_i2c_client_eeprom_path, mode="rb", buffering=0)
except IOError:
print("Error: reading sysfs file %s" %
sysfs_sfp_i2c_client_eeprom_path)
sysfsfile_eeprom.seek(offset)
raw = sysfsfile_eeprom.read(num_bytes)
for n in range(0, num_bytes):
eeprom_raw[n] = hex(ord(raw[n]))[2:].zfill(2)
except:
pass
finally:
if sysfsfile_eeprom:
eeprom_raw = self._read_eeprom_specific_bytes(
sysfsfile_eeprom, offset, num_bytes)
sysfsfile_eeprom.close()
return eeprom_raw
def get_transceiver_info(self):
@ -238,45 +204,84 @@ class Sfp(SfpBase, SfpUtilBase):
vendor_oui |1*255VCHAR |vendor OUI
========================================================================
"""
transceiver_info_dict = dict()
# get eeprom data
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('interface'):
transceiver_info_data = self.eeprom_dict['interface'].get('data')
# check present status
sfpi_obj = sff8436InterfaceId()
if not self.get_presence() or not sfpi_obj:
return {}
# set specification_compliance
spec_com = transceiver_info_data.get(
self.QSFP_EEPROM_SPEC_COM_KEY, {})
spec_com_str = "/".join(list(spec_com.values()))
offset = INFO_OFFSET
# set normal transceiver info
transceiver_info_dict['type'] = transceiver_info_data.get(
self.QSFP_EEPROM_TYPE_KEY, 'N/A')
transceiver_info_dict['hardwarerev'] = transceiver_info_data.get(
self.QSFP_EEPROM_HW_REV_KEY, 'N/A')
transceiver_info_dict['manufacturename'] = transceiver_info_data.get(
self.QSFP_EEPROM_MF_NAME_KEY, 'N/A')
transceiver_info_dict['modelname'] = transceiver_info_data.get(
self.QSFP_EEPROM_MODEL_NAME_KEY, 'N/A')
transceiver_info_dict['serialnum'] = transceiver_info_data.get(
self.QSFP_EEPROM_SERIAL_KEY, 'N/A')
transceiver_info_dict['Connector'] = transceiver_info_data.get(
self.QSFP_EEPROM_CONNECTOR_KEY, 'N/A')
transceiver_info_dict['encoding'] = transceiver_info_data.get(
self.QSFP_EEPROM_ENCODE_KEY, 'N/A')
transceiver_info_dict['ext_identifier'] = transceiver_info_data.get(
self.QSFP_EEPROM_EXT_IDENT_KEY, 'N/A')
transceiver_info_dict['ext_rateselect_compliance'] = transceiver_info_data.get(
self.QSFP_EEPROM_EXT_RATE_KEY, 'N/A')
transceiver_info_dict['cable_length'] = transceiver_info_data.get(
self.QSFP_EEPROM_CABLE_KEY, 'N/A')
transceiver_info_dict['vendor_date'] = transceiver_info_data.get(
self.QSFP_EEPROM_DATE_KEY, 'N/A')
transceiver_info_dict['vendor_oui'] = transceiver_info_data.get(
self.QSFP_EEPROM_OUI_KEY, 'N/A')
transceiver_info_dict['nominal_bit_rate'] = transceiver_info_data.get(
self.QSFP_EEPROM_BIT_RATE_KEY, 'N/A')
transceiver_info_dict['specification_compliance'] = spec_com_str or "N/A"
sfp_interface_bulk_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_INTFACE_BULK_OFFSET), XCVR_INTFACE_BULK_WIDTH_QSFP)
sfp_interface_bulk_data = sfpi_obj.parse_sfp_info_bulk(
sfp_interface_bulk_raw, 0)
sfp_vendor_name_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_NAME_OFFSET), XCVR_VENDOR_NAME_WIDTH)
sfp_vendor_name_data = sfpi_obj.parse_vendor_name(
sfp_vendor_name_raw, 0)
sfp_vendor_pn_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_PN_OFFSET), XCVR_VENDOR_PN_WIDTH)
sfp_vendor_pn_data = sfpi_obj.parse_vendor_pn(
sfp_vendor_pn_raw, 0)
sfp_vendor_rev_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_HW_REV_OFFSET), XCVR_HW_REV_WIDTH_QSFP)
sfp_vendor_rev_data = sfpi_obj.parse_vendor_rev(
sfp_vendor_rev_raw, 0)
sfp_vendor_sn_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_SN_OFFSET), XCVR_VENDOR_SN_WIDTH)
sfp_vendor_sn_data = sfpi_obj.parse_vendor_sn(
sfp_vendor_sn_raw, 0)
sfp_vendor_oui_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_OUI_OFFSET), XCVR_VENDOR_OUI_WIDTH)
if sfp_vendor_oui_raw is not None:
sfp_vendor_oui_data = sfpi_obj.parse_vendor_oui(
sfp_vendor_oui_raw, 0)
sfp_vendor_date_raw = self.__read_eeprom_specific_bytes(
(offset + XCVR_VENDOR_DATE_OFFSET), XCVR_VENDOR_DATE_WIDTH)
sfp_vendor_date_data = sfpi_obj.parse_vendor_date(
sfp_vendor_date_raw, 0)
transceiver_info_dict = dict.fromkeys(self.info_dict_keys, 'N/A')
compliance_code_dict = dict()
if sfp_interface_bulk_data:
transceiver_info_dict['type'] = sfp_interface_bulk_data['data']['type']['value']
transceiver_info_dict['Connector'] = sfp_interface_bulk_data['data']['Connector']['value']
transceiver_info_dict['encoding'] = sfp_interface_bulk_data['data']['EncodingCodes']['value']
transceiver_info_dict['ext_identifier'] = sfp_interface_bulk_data['data']['Extended Identifier']['value']
transceiver_info_dict['ext_rateselect_compliance'] = sfp_interface_bulk_data['data']['RateIdentifier']['value']
transceiver_info_dict['type_abbrv_name'] = sfp_interface_bulk_data['data']['type_abbrv_name']['value']
transceiver_info_dict['manufacturename'] = sfp_vendor_name_data[
'data']['Vendor Name']['value'] if sfp_vendor_name_data else 'N/A'
transceiver_info_dict['modelname'] = sfp_vendor_pn_data['data']['Vendor PN']['value'] if sfp_vendor_pn_data else 'N/A'
transceiver_info_dict['hardwarerev'] = sfp_vendor_rev_data['data']['Vendor Rev']['value'] if sfp_vendor_rev_data else 'N/A'
transceiver_info_dict['serialnum'] = sfp_vendor_sn_data['data']['Vendor SN']['value'] if sfp_vendor_sn_data else 'N/A'
transceiver_info_dict['vendor_oui'] = sfp_vendor_oui_data['data']['Vendor OUI']['value'] if sfp_vendor_oui_data else 'N/A'
transceiver_info_dict['vendor_date'] = sfp_vendor_date_data[
'data']['VendorDataCode(YYYY-MM-DD Lot)']['value'] if sfp_vendor_date_data else 'N/A'
transceiver_info_dict['cable_type'] = "Unknown"
transceiver_info_dict['cable_length'] = "Unknown"
for key in qsfp_cable_length_tup:
if key in sfp_interface_bulk_data['data']:
transceiver_info_dict['cable_type'] = key
transceiver_info_dict['cable_length'] = str(
sfp_interface_bulk_data['data'][key]['value'])
for key in qsfp_compliance_code_tup:
if key in sfp_interface_bulk_data['data']['Specification compliance']['value']:
compliance_code_dict[key] = sfp_interface_bulk_data['data']['Specification compliance']['value'][key]['value']
transceiver_info_dict['specification_compliance'] = str(
compliance_code_dict)
transceiver_info_dict['nominal_bit_rate'] = str(
sfp_interface_bulk_data['data']['Nominal Bit Rate(100Mbs)']['value'])
return transceiver_info_dict
@ -288,83 +293,200 @@ class Sfp(SfpBase, SfpUtilBase):
========================================================================
keys |Value Format |Information
---------------------------|---------------|----------------------------
RX LOS |BOOLEAN |RX lost-of-signal status,
| |True if has RX los, False if not.
TX FAULT |BOOLEAN |TX fault status,
| |True if has TX fault, False if not.
Reset status |BOOLEAN |reset status,
| |True if SFP in reset, False if not.
LP mode |BOOLEAN |low power mode status,
| |True in lp mode, False if not.
TX disable |BOOLEAN |TX disable status,
| |True TX disabled, False if not.
TX disabled channel |HEX |disabled TX channles in hex,
| |bits 0 to 3 represent channel 0
rx_los |BOOLEAN |RX loss-of-signal status, True if has RX los, False if not.
tx_fault |BOOLEAN |TX fault status, True if has TX fault, False if not.
reset_status |BOOLEAN |reset status, True if SFP in reset, False if not.
lp_mode |BOOLEAN |low power mode status, True in lp mode, False if not.
tx_disable |BOOLEAN |TX disable status, True TX disabled, False if not.
tx_disabled_channel |HEX |disabled TX channels in hex, bits 0 to 3 represent channel 0
| |to channel 3.
Temperature |INT |module temperature in Celsius
Voltage |INT |supply voltage in mV
TX bias |INT |TX Bias Current in mA
RX power |INT |received optical power in mW
TX power |INT |TX output power in mW
temperature |INT |module temperature in Celsius
voltage |INT |supply voltage in mV
tx<n>bias |INT |TX Bias Current in mA, n is the channel number,
| |for example, tx2bias stands for tx bias of channel 2.
rx<n>power |INT |received optical power in mW, n is the channel number,
| |for example, rx2power stands for rx power of channel 2.
tx<n>power |INT |TX output power in mW, n is the channel number,
| |for example, tx2power stands for tx power of channel 2.
========================================================================
"""
transceiver_dom_info_dict = dict()
self.eeprom_dict = self.get_eeprom_dict(self.port_num)
if self.eeprom_dict and self.eeprom_dict.get('dom'):
transceiver_dom_data = self.eeprom_dict['dom'].get('data', {})
transceiver_dom_data_mmv = transceiver_dom_data.get(
"ModuleMonitorValues")
transceiver_dom_data_cmv = transceiver_dom_data.get(
"ChannelMonitorValues")
transceiver_dom_info_dict['temperature'] = transceiver_dom_data_mmv.get(
'Temperature', 'N/A')
transceiver_dom_info_dict['voltage'] = transceiver_dom_data_mmv.get(
'Vcc', 'N/A')
transceiver_dom_info_dict['rx1power'] = transceiver_dom_data_cmv.get(
'RX1Power', 'N/A')
transceiver_dom_info_dict['rx2power'] = transceiver_dom_data_cmv.get(
'RX2Power', 'N/A')
transceiver_dom_info_dict['rx3power'] = transceiver_dom_data_cmv.get(
'RX3Power', 'N/A')
transceiver_dom_info_dict['rx4power'] = transceiver_dom_data_cmv.get(
'RX4Power', 'N/A')
transceiver_dom_info_dict['tx1bias'] = transceiver_dom_data_cmv.get(
'TX1Bias', 'N/A')
transceiver_dom_info_dict['tx2bias'] = transceiver_dom_data_cmv.get(
'TX2Bias', 'N/A')
transceiver_dom_info_dict['tx3bias'] = transceiver_dom_data_cmv.get(
'TX3Bias', 'N/A')
transceiver_dom_info_dict['tx4bias'] = transceiver_dom_data_cmv.get(
'TX4Bias', 'N/A')
transceiver_dom_info_dict['tx1power'] = transceiver_dom_data_cmv.get(
'TX1Power', 'N/A')
transceiver_dom_info_dict['tx2power'] = transceiver_dom_data_cmv.get(
'TX2Power', 'N/A')
transceiver_dom_info_dict['tx3power'] = transceiver_dom_data_cmv.get(
'TX3Power', 'N/A')
transceiver_dom_info_dict['tx4power'] = transceiver_dom_data_cmv.get(
'TX4Power', 'N/A')
# check present status
sfpd_obj = sff8436Dom()
sfpi_obj = sff8436InterfaceId()
if not self.get_presence() or not sfpi_obj or not sfpd_obj:
return {}
transceiver_dom_info_dict = dict.fromkeys(self.dom_dict_keys, 'N/A')
offset = DOM_OFFSET
offset_xcvr = INFO_OFFSET
# QSFP capability byte parse, through this byte can know whether it support tx_power or not.
# TODO: in the future when decided to migrate to support SFF-8636 instead of SFF-8436,
# need to add more code for determining the capability and version compliance
# in SFF-8636 dom capability definitions evolving with the versions.
qsfp_dom_capability_raw = self.__read_eeprom_specific_bytes(
(offset_xcvr + XCVR_DOM_CAPABILITY_OFFSET), XCVR_DOM_CAPABILITY_WIDTH)
if qsfp_dom_capability_raw is not None:
qspf_dom_capability_data = sfpi_obj.parse_qsfp_dom_capability(
qsfp_dom_capability_raw, 0)
else:
return None
dom_temperature_raw = self.__read_eeprom_specific_bytes(
(offset + QSFP_TEMPE_OFFSET), QSFP_TEMPE_WIDTH)
if dom_temperature_raw is not None:
dom_temperature_data = sfpd_obj.parse_temperature(
dom_temperature_raw, 0)
transceiver_dom_info_dict['temperature'] = dom_temperature_data['data']['Temperature']['value']
dom_voltage_raw = self.__read_eeprom_specific_bytes(
(offset + QSFP_VOLT_OFFSET), QSFP_VOLT_WIDTH)
if dom_voltage_raw is not None:
dom_voltage_data = sfpd_obj.parse_voltage(dom_voltage_raw, 0)
transceiver_dom_info_dict['voltage'] = dom_voltage_data['data']['Vcc']['value']
qsfp_dom_rev_raw = self.__read_eeprom_specific_bytes(
(offset + QSFP_DOM_REV_OFFSET), QSFP_DOM_REV_WIDTH)
if qsfp_dom_rev_raw is not None:
qsfp_dom_rev_data = sfpd_obj.parse_sfp_dom_rev(qsfp_dom_rev_raw, 0)
qsfp_dom_rev = qsfp_dom_rev_data['data']['dom_rev']['value']
# The tx_power monitoring is only available on QSFP which compliant with SFF-8636
# and claimed that it support tx_power with one indicator bit.
dom_channel_monitor_data = {}
dom_channel_monitor_raw = None
qsfp_tx_power_support = qspf_dom_capability_data['data']['Tx_power_support']['value']
if (qsfp_dom_rev[0:8] != 'SFF-8636' or (qsfp_dom_rev[0:8] == 'SFF-8636' and qsfp_tx_power_support != 'on')):
dom_channel_monitor_raw = self.__read_eeprom_specific_bytes(
(offset + QSFP_CHANNL_MON_OFFSET), QSFP_CHANNL_MON_WIDTH)
if dom_channel_monitor_raw is not None:
dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params(
dom_channel_monitor_raw, 0)
else:
dom_channel_monitor_raw = self.__read_eeprom_specific_bytes(
(offset + QSFP_CHANNL_MON_OFFSET), QSFP_CHANNL_MON_WITH_TX_POWER_WIDTH)
if dom_channel_monitor_raw is not None:
dom_channel_monitor_data = sfpd_obj.parse_channel_monitor_params_with_tx_power(
dom_channel_monitor_raw, 0)
transceiver_dom_info_dict['tx1power'] = dom_channel_monitor_data['data']['TX1Power']['value']
transceiver_dom_info_dict['tx2power'] = dom_channel_monitor_data['data']['TX2Power']['value']
transceiver_dom_info_dict['tx3power'] = dom_channel_monitor_data['data']['TX3Power']['value']
transceiver_dom_info_dict['tx4power'] = dom_channel_monitor_data['data']['TX4Power']['value']
if dom_channel_monitor_raw:
transceiver_dom_info_dict['rx1power'] = dom_channel_monitor_data['data']['RX1Power']['value']
transceiver_dom_info_dict['rx2power'] = dom_channel_monitor_data['data']['RX2Power']['value']
transceiver_dom_info_dict['rx3power'] = dom_channel_monitor_data['data']['RX3Power']['value']
transceiver_dom_info_dict['rx4power'] = dom_channel_monitor_data['data']['RX4Power']['value']
transceiver_dom_info_dict['tx1bias'] = dom_channel_monitor_data['data']['TX1Bias']['value']
transceiver_dom_info_dict['tx2bias'] = dom_channel_monitor_data['data']['TX2Bias']['value']
transceiver_dom_info_dict['tx3bias'] = dom_channel_monitor_data['data']['TX3Bias']['value']
transceiver_dom_info_dict['tx4bias'] = dom_channel_monitor_data['data']['TX4Bias']['value']
for key in transceiver_dom_info_dict:
transceiver_dom_info_dict[key] = self._convert_string_to_num(
transceiver_dom_info_dict[key])
transceiver_dom_info_dict['rx_los'] = self.get_rx_los()
transceiver_dom_info_dict['tx_fault'] = self.get_tx_fault()
transceiver_dom_info_dict['reset_status'] = self.get_reset_status()
transceiver_dom_info_dict['lp_mode'] = self.get_lpmode()
return transceiver_dom_info_dict
def get_transceiver_threshold_info(self):
"""
Retrieves transceiver threshold info of this SFP
Returns:
A dict which contains following keys/values :
========================================================================
keys |Value Format |Information
---------------------------|---------------|----------------------------
temphighalarm |FLOAT |High Alarm Threshold value of temperature in Celsius.
templowalarm |FLOAT |Low Alarm Threshold value of temperature in Celsius.
temphighwarning |FLOAT |High Warning Threshold value of temperature in Celsius.
templowwarning |FLOAT |Low Warning Threshold value of temperature in Celsius.
vcchighalarm |FLOAT |High Alarm Threshold value of supply voltage in mV.
vcclowalarm |FLOAT |Low Alarm Threshold value of supply voltage in mV.
vcchighwarning |FLOAT |High Warning Threshold value of supply voltage in mV.
vcclowwarning |FLOAT |Low Warning Threshold value of supply voltage in mV.
rxpowerhighalarm |FLOAT |High Alarm Threshold value of received power in dBm.
rxpowerlowalarm |FLOAT |Low Alarm Threshold value of received power in dBm.
rxpowerhighwarning |FLOAT |High Warning Threshold value of received power in dBm.
rxpowerlowwarning |FLOAT |Low Warning Threshold value of received power in dBm.
txpowerhighalarm |FLOAT |High Alarm Threshold value of transmit power in dBm.
txpowerlowalarm |FLOAT |Low Alarm Threshold value of transmit power in dBm.
txpowerhighwarning |FLOAT |High Warning Threshold value of transmit power in dBm.
txpowerlowwarning |FLOAT |Low Warning Threshold value of transmit power in dBm.
txbiashighalarm |FLOAT |High Alarm Threshold value of tx Bias Current in mA.
txbiaslowalarm |FLOAT |Low Alarm Threshold value of tx Bias Current in mA.
txbiashighwarning |FLOAT |High Warning Threshold value of tx Bias Current in mA.
txbiaslowwarning |FLOAT |Low Warning Threshold value of tx Bias Current in mA.
========================================================================
"""
# check present status
sfpd_obj = sff8436Dom()
if not self.get_presence() or not sfpd_obj:
return {}
transceiver_dom_threshold_dict = dict.fromkeys(
self.threshold_dict_keys, 'N/A')
dom_thres_raw = self.__read_eeprom_specific_bytes(
QSFP_MODULE_THRESHOLD_OFFSET, QSFP_MODULE_THRESHOLD_WIDTH) if self.get_presence() and sfpd_obj else None
if dom_thres_raw:
module_threshold_values = sfpd_obj.parse_module_threshold_values(
dom_thres_raw, 0)
module_threshold_data = module_threshold_values.get('data')
if module_threshold_data:
transceiver_dom_threshold_dict['temphighalarm'] = module_threshold_data['TempHighAlarm']['value']
transceiver_dom_threshold_dict['templowalarm'] = module_threshold_data['TempLowAlarm']['value']
transceiver_dom_threshold_dict['temphighwarning'] = module_threshold_data['TempHighWarning']['value']
transceiver_dom_threshold_dict['templowwarning'] = module_threshold_data['TempLowWarning']['value']
transceiver_dom_threshold_dict['vcchighalarm'] = module_threshold_data['VccHighAlarm']['value']
transceiver_dom_threshold_dict['vcclowalarm'] = module_threshold_data['VccLowAlarm']['value']
transceiver_dom_threshold_dict['vcchighwarning'] = module_threshold_data['VccHighWarning']['value']
transceiver_dom_threshold_dict['vcclowwarning'] = module_threshold_data['VccLowWarning']['value']
dom_thres_raw = self.__read_eeprom_specific_bytes(
QSFP_CHANNEL_THRESHOLD_OFFSET, QSFP_CHANNEL_THRESHOLD_WIDTH) if self.get_presence() and sfpd_obj else None
channel_threshold_values = sfpd_obj.parse_channel_threshold_values(
dom_thres_raw, 0)
channel_threshold_data = channel_threshold_values.get('data')
if channel_threshold_data:
transceiver_dom_threshold_dict['rxpowerhighalarm'] = channel_threshold_data['RxPowerHighAlarm']['value']
transceiver_dom_threshold_dict['rxpowerlowalarm'] = channel_threshold_data['RxPowerLowAlarm']['value']
transceiver_dom_threshold_dict['rxpowerhighwarning'] = channel_threshold_data['RxPowerHighWarning']['value']
transceiver_dom_threshold_dict['rxpowerlowwarning'] = channel_threshold_data['RxPowerLowWarning']['value']
transceiver_dom_threshold_dict['txpowerhighalarm'] = "0.0dBm"
transceiver_dom_threshold_dict['txpowerlowalarm'] = "0.0dBm"
transceiver_dom_threshold_dict['txpowerhighwarning'] = "0.0dBm"
transceiver_dom_threshold_dict['txpowerlowwarning'] = "0.0dBm"
transceiver_dom_threshold_dict['txbiashighalarm'] = channel_threshold_data['TxBiasHighAlarm']['value']
transceiver_dom_threshold_dict['txbiaslowalarm'] = channel_threshold_data['TxBiasLowAlarm']['value']
transceiver_dom_threshold_dict['txbiashighwarning'] = channel_threshold_data['TxBiasHighWarning']['value']
transceiver_dom_threshold_dict['txbiaslowwarning'] = channel_threshold_data['TxBiasLowWarning']['value']
for key in transceiver_dom_threshold_dict:
transceiver_dom_threshold_dict[key] = self._convert_string_to_num(
transceiver_dom_threshold_dict[key])
return transceiver_dom_threshold_dict
def get_reset_status(self):
"""
Retrieves the reset status of SFP
Returns:
A Boolean, True if reset enabled, False if disabled
"""
try:
reg_file = open(self.RESET_PATH, "r")
except IOError as e:
print("Error: unable to open file: %s" % str(e))
reset_status_raw = self.__read_txt_file(self.RESET_PATH).rstrip()
if not reset_status_raw:
return False
content = reg_file.readline().rstrip()
reg_value = int(content, 16)
reg_value = int(reset_status_raw, 16)
bin_format = bin(reg_value)[2:].zfill(32)
return bin_format[::-1][self.index] == '0'
@ -375,16 +497,18 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if SFP has RX LOS, False if not.
Note : RX LOS status is latched until a call to get_rx_los or a reset.
"""
rx_los = False
rx_los_list = []
dom_channel_monitor_raw = self.__read_eeprom_specific_bytes(
self.QSFP_CHANNL_RX_LOS_STATUS_OFFSET, self.QSFP_CHANNL_RX_LOS_STATUS_WIDTH) if self.get_presence() else None
QSFP_CHANNL_RX_LOS_STATUS_OFFSET, QSFP_CHANNL_RX_LOS_STATUS_WIDTH) if self.get_presence() else None
if dom_channel_monitor_raw is not None:
rx_los_data = int(dom_channel_monitor_raw[0], 16)
rx_los_list.append(rx_los_data & 0x01 != 0)
rx_los_list.append(rx_los_data & 0x02 != 0)
rx_los_list.append(rx_los_data & 0x04 != 0)
rx_los_list.append(rx_los_data & 0x08 != 0)
return rx_los_list
rx_los = rx_los_list[0] and rx_los_list[1] and rx_los_list[2] and rx_los_list[3]
return rx_los
def get_tx_fault(self):
"""
@ -393,16 +517,19 @@ class Sfp(SfpBase, SfpUtilBase):
A Boolean, True if SFP has TX fault, False if not
Note : TX fault status is lached until a call to get_tx_fault or a reset.
"""
tx_fault = False
tx_fault_list = []
dom_channel_monitor_raw = self.__read_eeprom_specific_bytes(
self.QSFP_CHANNL_TX_FAULT_STATUS_OFFSET, self.QSFP_CHANNL_TX_FAULT_STATUS_WIDTH) if self.get_presence() else None
QSFP_CHANNL_TX_FAULT_STATUS_OFFSET, QSFP_CHANNL_TX_FAULT_STATUS_WIDTH) if self.get_presence() else None
if dom_channel_monitor_raw is not None:
tx_fault_data = int(dom_channel_monitor_raw[0], 16)
tx_fault_list.append(tx_fault_data & 0x01 != 0)
tx_fault_list.append(tx_fault_data & 0x02 != 0)
tx_fault_list.append(tx_fault_data & 0x04 != 0)
tx_fault_list.append(tx_fault_data & 0x08 != 0)
return tx_fault_list
tx_fault = tx_fault_list[0] and tx_fault_list[1] and tx_fault_list[2] and tx_fault_list[3]
return tx_fault
def get_tx_disable(self):
"""
@ -417,7 +544,7 @@ class Sfp(SfpBase, SfpUtilBase):
return False
dom_control_raw = self.__read_eeprom_specific_bytes(
self.QSFP_CONTROL_OFFSET, self.QSFP_CONTROL_WIDTH) if self.get_presence() else None
QSFP_CONTROL_OFFSET, QSFP_CONTROL_WIDTH) if self.get_presence() else None
if dom_control_raw is not None:
dom_control_data = sfpd_obj.parse_control_bytes(dom_control_raw, 0)
tx_disable_list.append(
@ -455,7 +582,27 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
A Boolean, True if lpmode is enabled, False if disabled
"""
return self.get_low_power_mode(self.port_num)
try:
reg_file = open(self.LP_PATH, "r")
content = reg_file.readline().rstrip()
except IOError as e:
print("Error: unable to open file: %s" % str(e))
return False
# content is a string containing the hex representation of the register
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = self.port_num - 1 if self.PORT_START == 1 else self.port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
# LPMode is active high
if reg_value & mask == 0:
return False
return True
def get_power_override(self):
"""
@ -471,7 +618,7 @@ class Sfp(SfpBase, SfpUtilBase):
return False
dom_control_raw = self.__read_eeprom_specific_bytes(
self.QSFP_CONTROL_OFFSET, self.QSFP_CONTROL_WIDTH) if self.get_presence() else None
QSFP_CONTROL_OFFSET, QSFP_CONTROL_WIDTH) if self.get_presence() else None
if dom_control_raw is not None:
dom_control_data = sfpd_obj.parse_control_bytes(dom_control_raw, 0)
power_override = (
@ -540,7 +687,7 @@ class Sfp(SfpBase, SfpUtilBase):
tx2_pw = transceiver_dom_info_dict.get("tx2power", "N/A")
tx3_pw = transceiver_dom_info_dict.get("tx3power", "N/A")
tx4_pw = transceiver_dom_info_dict.get("tx4power", "N/A")
return [tx1_pw, tx2_pw, tx3_pw, tx4_pw] if transceiver_dom_info_dict else []
return [tx1_pw, tx2_pw, tx3_pw, tx4_pw]
def reset(self):
"""
@ -563,7 +710,7 @@ class Sfp(SfpBase, SfpUtilBase):
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = self.port_num - 1 if self.port_start == 1 else self.port_num
bit_index = self.port_num - 1 if self.PORT_START == 1 else self.port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
@ -610,7 +757,7 @@ class Sfp(SfpBase, SfpUtilBase):
# Write to eeprom
sysfsfile_eeprom = open(
self.port_to_eeprom_mapping[self.port_num], "r+b")
sysfsfile_eeprom.seek(self.QSFP_CONTROL_OFFSET)
sysfsfile_eeprom.seek(QSFP_CONTROL_OFFSET)
sysfsfile_eeprom.write(buffer[0])
except IOError as e:
print "Error: unable to open file: %s" % str(e)
@ -644,7 +791,7 @@ class Sfp(SfpBase, SfpUtilBase):
# Write to eeprom
sysfsfile_eeprom = open(
self.port_to_eeprom_mapping[self.port_num], "r+b")
sysfsfile_eeprom.seek(self.QSFP_CONTROL_OFFSET)
sysfsfile_eeprom.seek(QSFP_CONTROL_OFFSET)
sysfsfile_eeprom.write(buffer[0])
except IOError as e:
print "Error: unable to open file: %s" % str(e)
@ -664,7 +811,33 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
A boolean, True if lpmode is set successfully, False if not
"""
return self.set_low_power_mode(self.port_num, lpmode)
try:
reg_file = open(self.LP_PATH, "r+")
except IOError as e:
print("Error: unable to open file: %s" % str(e))
return False
content = reg_file.readline().rstrip()
# content is a string containing the hex representation of the register
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = self.port_num - 1 if self.PORT_START == 1 else self.port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
# LPMode is active high; set or clear the bit accordingly
reg_value = reg_value | mask if lpmode else reg_value & ~mask
# Convert our register value back to a hex string and write back
content = hex(reg_value).strip('L')
reg_file.seek(0)
reg_file.write(content)
reg_file.close()
return True
def set_power_override(self, power_override, power_set):
"""
@ -697,7 +870,7 @@ class Sfp(SfpBase, SfpUtilBase):
# Write to eeprom
sysfsfile_eeprom = open(
self.port_to_eeprom_mapping[self.port_num], "r+b")
sysfsfile_eeprom.seek(self.QSFP_POWEROVERRIDE_OFFSET)
sysfsfile_eeprom.seek(QSFP_POWEROVERRIDE_OFFSET)
sysfsfile_eeprom.write(buffer[0])
except IOError as e:
print "Error: unable to open file: %s" % str(e)
@ -714,7 +887,11 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
string: The name of the device
"""
return self.logical[self.index]
sfputil_helper = SfpUtilHelper()
sfputil_helper.read_porttab_mappings(
self.__get_path_to_port_config_file())
name = sfputil_helper.logical[self.index] or "Unknown"
return name
def get_presence(self):
"""
@ -722,17 +899,15 @@ class Sfp(SfpBase, SfpUtilBase):
Returns:
bool: True if PSU is present, False if not
"""
try:
reg_file = open(self.PRS_PATH, "r")
except IOError as e:
print("Error: unable to open file: %s" % str(e))
presence_status_raw = self.__read_txt_file(self.PRS_PATH).rstrip()
if not presence_status_raw:
return False
content = reg_file.readline().rstrip()
content = presence_status_raw.rstrip()
reg_value = int(content, 16)
# Determind if port_num start from 1 or 0
bit_index = self.port_num - 1 if self.port_start == 1 else self.port_num
bit_index = self.port_num - 1 if self.PORT_START == 1 else self.port_num
# Mask off the bit corresponding to our port
mask = (1 << bit_index)
@ -760,3 +935,11 @@ class Sfp(SfpBase, SfpUtilBase):
"""
transceiver_dom_info_dict = self.get_transceiver_info()
return transceiver_dom_info_dict.get("serialnum", "N/A")
def get_status(self):
"""
Retrieves the operational status of the device
Returns:
A boolean value, True if device is operating properly, False if not
"""
return self.get_presence() and self.get_transceiver_bulk_status()