#!/usr/bin/env python try: import time from sonic_sfp.sfputilbase import SfpUtilBase except ImportError, e: raise ImportError (str(e) + "- required module not found") class SfpUtil(SfpUtilBase): """Platform specific SfpUtill class""" _port_start = 1 _port_end = 54 _qsfp_port_start = 49 _ports_in_block = 54 _port_to_eeprom_mapping = {} _global_port_pres_dict = {} _port_to_i2c_mapping = { 1 : 8, 2 : 9, 3 : 10, 4 : 11, 5 : 12, 6 : 13, 7 : 14, 8 : 15, 9 : 16, 10: 17, 11 : 18, 12 : 19, 13 : 20, 14 : 21, 15 : 22, 16 : 23, 17 : 24, 18 : 25, 19 : 26, 20 : 27, 21 : 28, 22 : 29, 23 : 30, 24 : 31, 25 : 32, 26 : 33, 27 : 34, 28 : 35, 29 : 36, 30 : 37, 31 : 38, 32 : 39, 33 : 40, 34 : 41, 35 : 42, 36 : 43, 37 : 44, 38 : 45, 39 : 46, 40 : 47, 41 : 48, 42 : 49, 43 : 50, 44 : 51, 45 : 52, 46 : 53, 47 : 54, 48 : 55, 49 : 56, 50 : 57, 51 : 60, 52 : 61, 53 : 62, 54 : 63, } _qsfp_ports = range(_qsfp_port_start, _ports_in_block + 1) def get_presence(self, port_num): # Check for invalid port_num if port_num < self._port_start or port_num > self._port_end: return False path = "/sys/bus/i2c/devices/{0}-0050/sfp_is_present" port_ps = path.format(self._port_to_i2c_mapping[port_num]) try: reg_file = open(port_ps) except IOError as e: print "Error: unable to open file: %s" % str(e) return False try: reg_value = reg_file.readline().rstrip() except IOError as e: time.sleep(1) try: reg_value = reg_file.readline().rstrip() except IOError as e: print "Error:try again to read file failed: %s %s" % (str(e), port_ps) reg_file.close() return False reg_file.close() if reg_value == '1': return True reg_file.close() if reg_value == '1': return True return False def init_global_port_presence(self): for port_num in range(self.port_start, (self.port_end + 1)): self._global_port_pres_dict[port_num] = '0' def __init__(self): eeprom_path = '/sys/bus/i2c/devices/{0}-0050/sfp_eeprom' for x in range(self._port_start, self._port_end + 1): port_eeprom_path = eeprom_path.format(self._port_to_i2c_mapping[x]) self._port_to_eeprom_mapping[x] = port_eeprom_path self.init_global_port_presence() SfpUtilBase.__init__(self) def reset(self, port_num): # Check for invalid port_num if port_num < self._port_start or port_num > self._port_end: return False path = "/sys/bus/i2c/devices/{0}-0050/sfp_port_reset" port_ps = path.format(self._port_to_i2c_mapping[port_num]) try: reg_file = open(port_ps, 'w') except IOError as e: print "Error: unable to open file: %s" % str(e) return False #toggle reset reg_file.seek(0) reg_file.write('1') time.sleep(1) reg_file.seek(0) reg_file.write('0') reg_file.close() return True def set_low_power_mode(self, port_num, lpmode): # Check for invalid port_num if port_num < self._qsfp_port_start or port_num > self._port_end: return False pre_value = self.get_presence(port_num) if pre_value == False: return False path = "/sys/bus/i2c/devices/{0}-0050/sfp_lpmode" port_ps = path.format(self._port_to_i2c_mapping[port_num]) try: reg_file = open(port_ps,'w') except IOError as e: print "Error: unable to open file: %s" % str(e) return False reg_file.seek(0) if lpmode == 1: reg_file.write('1') elif lpmode == 0: reg_file.write('0') reg_file.close() return True def get_low_power_mode(self, port_num): # Check for invalid port_num if port_num < self._qsfp_port_start or port_num > self._port_end: return False pre_value = self.get_presence(port_num) if pre_value == False: return False path = "/sys/bus/i2c/devices/{0}-0050/sfp_lpmode" port_ps = path.format(self._port_to_i2c_mapping[port_num]) try: reg_file = open(port_ps) except IOError as e: print "Error: unable to open file:%s %s" % (str(e), port_ps) return False try: reg_value = reg_file.readline().rstrip() except IOError as e: print "Error: unable to open file:%s %s" % (str(e), port_ps) reg_file.close() return False reg_file.close() if reg_value == '1': return True return False def get_transceiver_change_event(self): port_dict = {} while True: for port_num in range(self.port_start, (self.port_end + 1)): presence = self.get_presence(port_num) if(presence and self._global_port_pres_dict[port_num] == '0'): self._global_port_pres_dict[port_num] = '1' port_dict[port_num] = '1' elif(not presence and self._global_port_pres_dict[port_num] == '1'): self._global_port_pres_dict[port_num] = '0' port_dict[port_num] = '0' if(len(port_dict) > 0): return True, port_dict time.sleep(0.5) @property def port_start(self): return self._port_start @property def port_end(self): return self._port_end @property def qsfp_ports(self): return self._qsfp_ports @property def port_to_eeprom_mapping(self): return self._port_to_eeprom_mapping