# sfputil.py # # Platform-specific SFP transceiver interface for SONiC # try: import time import os import traceback from sonic_sfp.sfputilbase import SfpUtilBase except ImportError as e: raise ImportError("%s - required module not found" % str(e)) class SfpUtil(SfpUtilBase): """Platform-specific SfpUtil class""" PORT_START = 1 PORT_END = 56 PORTS_IN_BLOCK = 57 EEPROM_OFFSET = 32 SFP_DEVICE_TYPE = "optoe2" QSFP_DEVICE_TYPE = "optoe1" I2C_MAX_ATTEMPT = 3 _port_to_eeprom_mapping = {} port_to_i2cbus_mapping ={} @property def port_start(self): return self.PORT_START @property def port_end(self): return self.PORT_END @property def qsfp_ports(self): return range(49, self.PORTS_IN_BLOCK) @property def port_to_eeprom_mapping(self): return self._port_to_eeprom_mapping def __init__(self): for x in range(self.PORT_START, self.PORTS_IN_BLOCK): self.port_to_i2cbus_mapping[x] = x + self.EEPROM_OFFSET - 1 SfpUtilBase.__init__(self) def _sfp_read_file_path(self, file_path, offset, num_bytes): attempts = 0 while attempts < self.I2C_MAX_ATTEMPT: try: file_path.seek(offset) read_buf = file_path.read(num_bytes) except Exception: attempts += 1 time.sleep(0.05) return True, read_buf return False, None def _sfp_eeprom_present(self, sysfs_sfp_i2c_client_eeprompath, offset): """Tries to read the eeprom file to determine if the device/sfp is present or not. If sfp present, the read returns valid bytes. If not, read returns error 'Connection timed out""" if not os.path.exists(sysfs_sfp_i2c_client_eeprompath): return False with open(sysfs_sfp_i2c_client_eeprompath, "rb", buffering=0) as sysfsfile: rv, buf = self._sfp_read_file_path(sysfsfile, offset, 1) return rv def _add_new_sfp_device(self, sysfs_sfp_i2c_adapter_path, devaddr, devtype): try: sysfs_nd_path = "%s/new_device" % sysfs_sfp_i2c_adapter_path # Write device address to new_device file nd_str = "%s %s" % (devtype, hex(devaddr)) with open(sysfs_nd_path, "w") as nd_file: nd_file.write(nd_str) except Exception as err: print("Error writing to new device file: %s" % str(err)) return 1 else: return 0 def _get_port_eeprom_path(self, port_num, devid): sysfs_i2c_adapter_base_path = "" if port_num in self.port_to_eeprom_mapping: sysfs_sfp_i2c_client_eeprom_path = self.port_to_eeprom_mapping[port_num] else: sysfs_i2c_adapter_base_path = "/sys/class/i2c-adapter" i2c_adapter_id = self._get_port_i2c_adapter_id(port_num) if i2c_adapter_id is None: print("Error getting i2c bus num") return None # Get i2c virtual bus path for the sfp sysfs_sfp_i2c_adapter_path = "%s/i2c-%s" % (sysfs_i2c_adapter_base_path, str(i2c_adapter_id)) # If i2c bus for port does not exist if not os.path.exists(sysfs_sfp_i2c_adapter_path): print("Could not find i2c bus %s. Driver not loaded?" % sysfs_sfp_i2c_adapter_path) return None sysfs_sfp_i2c_client_path = "%s/%s-00%s" % (sysfs_sfp_i2c_adapter_path, str(i2c_adapter_id), hex(devid)[-2:]) # If sfp device is not present on bus, Add it if not os.path.exists(sysfs_sfp_i2c_client_path): if port_num in self.qsfp_ports: ret = self._add_new_sfp_device( sysfs_sfp_i2c_adapter_path, devid, self.QSFP_DEVICE_TYPE) else: ret = self._add_new_sfp_device( sysfs_sfp_i2c_adapter_path, devid, self.SFP_DEVICE_TYPE) if ret != 0: print("Error adding sfp device") return None sysfs_sfp_i2c_client_eeprom_path = "%s/eeprom" % sysfs_sfp_i2c_client_path return sysfs_sfp_i2c_client_eeprom_path def _read_eeprom_specific_bytes(self, sysfsfile_eeprom, offset, num_bytes): eeprom_raw = [] for i in range(0, num_bytes): eeprom_raw.append("0x00") rv, raw = self._sfp_read_file_path(sysfsfile_eeprom, offset, num_bytes) if rv is False: return None try: for n in range(0, num_bytes): eeprom_raw[n] = hex(raw[n])[2:].zfill(2) except Exception: return None return eeprom_raw def get_eeprom_dom_raw(self, port_num): if port_num in self.qsfp_ports: # QSFP DOM EEPROM is also at addr 0x50 and thus also stored in eeprom_ifraw return None # Read dom eeprom at addr 0x51 return self._read_eeprom_devid(port_num, self.IDENTITY_EEPROM_ADDR, 256) def get_presence(self, port_num): # Check for invalid port_num if port_num < self.port_start or port_num > self.port_end: return False presence_path = "/sys/wb_plat/sff/sff%d/present" % port_num try: with open(presence_path, "rb") as data: presence_data = data.read(2) if presence_data == "": return False result = int(presence_data, 16) except IOError: return False if result == 1: return True return False def get_low_power_mode(self, port_num): # Check for invalid port_num return True def set_low_power_mode(self, port_num, lpmode): # Check for invalid port_num return True def reset(self, port_num): # Check for invalid port_num if port_num < self.port_start or port_num > self.port_end: return False return True def get_transceiver_change_event(self, timeout=0): return False, {} def get_highest_temperature(self): offset = 0 hightest_temperature = -9999 presence_flag = False read_eeprom_flag = False temperature_valid_flag = False for port in range(49, self.PORTS_IN_BLOCK): if self.get_presence(port) is False: continue presence_flag = True if port in self.qsfp_ports: offset = 22 else: offset = 96 eeprom_path = self._get_port_eeprom_path(port, 0x50) try: with open(eeprom_path, mode="rb", buffering=0) as eeprom: read_eeprom_flag = True eeprom_raw = self._read_eeprom_specific_bytes(eeprom, offset, 2) msb = int(eeprom_raw[0], 16) lsb = int(eeprom_raw[1], 16) result = (msb << 8) | (lsb & 0xff) result = float(result / 256.0) if -50 <= result <= 200: temperature_valid_flag = True hightest_temperature = max(hightest_temperature, result) except Exception: print(traceback.format_exc()) # all port not presence if presence_flag is False: hightest_temperature = -10000 # all port read eeprom fail elif read_eeprom_flag is False: hightest_temperature = -9999 # all port temperature invalid elif read_eeprom_flag is True and temperature_valid_flag is False: hightest_temperature = -10000 hightest_temperature = round(hightest_temperature, 2) return hightest_temperature