DellEMC Z9332f: Fix SFP issue (#11819)

* Update sfp

* Update sfp

* Update sfp.py
This commit is contained in:
Aravind Mani 2022-09-07 22:05:59 +05:30 committed by GitHub
parent edc4485d30
commit 37d34ddd34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -70,7 +70,7 @@ SFP_INFO_OFFSET = 0
SFP_DOM_OFFSET = 256 SFP_DOM_OFFSET = 256
SFP_STATUS_CONTROL_OFFSET = 110 SFP_STATUS_CONTROL_OFFSET = 110
SFP_STATUS_CONTROL_WIDTH = 7 SFP_STATUS_CONTROL_WIDTH = 1
SFP_TX_DISABLE_HARD_BIT = 7 SFP_TX_DISABLE_HARD_BIT = 7
SFP_TX_DISABLE_SOFT_BIT = 6 SFP_TX_DISABLE_SOFT_BIT = 6
@ -912,15 +912,14 @@ class Sfp(SfpBase):
rx_los_list = [] rx_los_list = []
try: try:
if self.sfp_type == 'QSFP_DD': if self.sfp_type == 'QSFP_DD':
offset = 512 offset = 2176
rx_los_mask = [ 0x01, 0x02, 0x04, 0x08 ,0x10, 0x20, 0x40, 0x80 ] rx_los_mask = [ 0x01, 0x02, 0x04, 0x08 ,0x10, 0x20, 0x40, 0x80 ]
dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path, dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path,
offset + QSFP_DD_RXLOS_OFFSET, QSFP_DD_RXLOS_WIDTH) offset + 147, 1)
if dom_channel_monitor_raw is not None: if dom_channel_monitor_raw is not None:
rx_los_data = int(dom_channel_monitor_raw[0], 8) rx_los_data = int(dom_channel_monitor_raw[0], 8)
for mask in rx_los_mask: for mask in rx_los_mask:
rx_los_list.append(rx_los_data & mask != 0) rx_los_list.append(bool(rx_los_data & mask != 0))
elif self.sfp_type == 'QSFP': elif self.sfp_type == 'QSFP':
rx_los_data = self._get_eeprom_data('rx_los') rx_los_data = self._get_eeprom_data('rx_los')
# As the function expects a single boolean, if any one channel experience LOS, # As the function expects a single boolean, if any one channel experience LOS,
@ -962,13 +961,19 @@ class Sfp(SfpBase):
tx_disable_list = [] tx_disable_list = []
try: try:
if self.sfp_type == 'QSFP_DD': if self.sfp_type == 'QSFP_DD':
return False dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path,2048 + 130, 1)
if dom_channel_monitor_raw is not None:
tx_disable_data = int(dom_channel_monitor_raw[0], 16)
for i in range(8):
tx_disable_list.append(bool(tx_disable_data & (1 << i) != 0))
elif self.sfp_type == 'QSFP': elif self.sfp_type == 'QSFP':
tx_disable_data = self._get_eeprom_data('tx_disable') tx_disable_data = self._get_eeprom_data('tx_disable')
for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'): for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'):
tx_disable_list.append(tx_disable_data['data'][tx_disable_id]['value'] == 'On') tx_disable_list.append(tx_disable_data['data'][tx_disable_id]['value'] == 'On')
else: else:
tx_disable_data = self._read_eeprom_bytes(self.eeprom_path, SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH) offset = 256
tx_disable_data = self._read_eeprom_bytes(self.eeprom_path, offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH)
data = int(tx_disable_data[0], 16) data = int(tx_disable_data[0], 16)
tx_disable_hard = (sffbase().test_bit(data, SFP_TX_DISABLE_HARD_BIT) != 0) tx_disable_hard = (sffbase().test_bit(data, SFP_TX_DISABLE_HARD_BIT) != 0)
tx_disable_soft = (sffbase().test_bit(data, SFP_TX_DISABLE_SOFT_BIT) != 0) tx_disable_soft = (sffbase().test_bit(data, SFP_TX_DISABLE_SOFT_BIT) != 0)
@ -977,19 +982,41 @@ class Sfp(SfpBase):
return 'N/A' return 'N/A'
return tx_disable_list return tx_disable_list
def tx_disable(self, tx_disable):
if self.sfp_type == 'QSFP_DD':
val = 0xff if tx_disable else 0x0
return self._write_eeprom_bytes(2048 + 130, 1, bytearray([val]))
elif self.sfp_type == 'QSFP':
val = 0xf if tx_disable else 0x0
return self._write_eeprom_bytes( 0 + 86, 1, bytearray([val]))
else:
offset = 256
if tx_disable is True:
val = 64
else:
val = 191
return self._write_eeprom_bytes(offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH, bytearray([val]))
def get_tx_disable_channel(self): def get_tx_disable_channel(self):
""" """
Retrieves the TX disabled channels in this SFP Retrieves the TX disabled channels in this SFP
""" """
tx_disable_channel = 0 tx_disable_channel = None
try: try:
if self.sfp_type == 'QSFP_DD': if self.sfp_type == 'QSFP_DD':
tx_disable_channel = 0 offset = 2048
dom_channel_monitor_raw = self._read_eeprom_bytes(self.eeprom_path, offset + 130, 1)
if dom_channel_monitor_raw is not None:
tx_disable_channel = int(dom_channel_monitor_raw[0], 16)
elif self.sfp_type == 'QSFP': elif self.sfp_type == 'QSFP':
tx_disable_data = self._get_eeprom_data('tx_disable') tx_disable_data = self._get_eeprom_data('tx_disable')
for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'): for tx_disable_id in ('Tx1Disable', 'Tx2Disable', 'Tx3Disable', 'Tx4Disable'):
tx_disable_channel <<= 1 tx_disable_channel <<= 1
tx_disable_channel |= (tx_disable_data['data']['Tx1Disable']['value'] == 'On') tx_disable_channel |= (tx_disable_data['data']['Tx1Disable']['value'] == 'On')
else:
tx_disable_channel = int(self.get_tx_disable()[0])
except (TypeError, ValueError): except (TypeError, ValueError):
return 'N/A' return 'N/A'
return tx_disable_channel return tx_disable_channel
@ -1091,7 +1118,7 @@ class Sfp(SfpBase):
return None return None
if not self.qsfp_dd_txbias_supported: if not self.qsfp_dd_txbias_supported:
for lane in range(0, 8): for lane in range(0, 8):
tx_bias_list.append("N/A") tx_bias_list.append(False)
return tx_bias_list return tx_bias_list
tx_bias_data_raw = self._read_eeprom_bytes(self.eeprom_path, offset + QSFP_DD_TXBIAS_OFFSET, QSFP_DD_TXBIAS_WIDTH) tx_bias_data_raw = self._read_eeprom_bytes(self.eeprom_path, offset + QSFP_DD_TXBIAS_OFFSET, QSFP_DD_TXBIAS_WIDTH)
@ -1128,7 +1155,7 @@ class Sfp(SfpBase):
return None return None
if not self.qsfp_dd_rxpower_supported: if not self.qsfp_dd_rxpower_supported:
for lane in range(0, 8): for lane in range(0, 8):
rx_power_list.append("N/A") rx_power_list.append(False)
return rx_power_list return rx_power_list
offset = QSFP_DD_PAGE17 offset = QSFP_DD_PAGE17
@ -1165,7 +1192,7 @@ class Sfp(SfpBase):
return None return None
if not self.qsfp_dd_txpower_supported: if not self.qsfp_dd_txpower_supported:
for lane in range(0, 8): for lane in range(0, 8):
tx_power_list.append("N/A") tx_power_list.append(False)
return tx_power_list return tx_power_list
offset = QSFP_DD_PAGE17 offset = QSFP_DD_PAGE17
@ -1293,17 +1320,45 @@ class Sfp(SfpBase):
except ValueError: pass except ValueError: pass
return intl_state return intl_state
def tx_disable(self, tx_disable):
"""
Disable SFP TX for all channels
"""
return False
def tx_disable_channel(self, channel, disable): def tx_disable_channel(self, channel, disable):
""" """
Sets the tx_disable for specified SFP channels Enables/Disables TX channel for SFP based on disable flag
""" """
return False channel_state = self.get_tx_disable_channel()
if channel_state is None or channel_state == 'N/A':
return False
if self.sfp_type == 'QSFP_DD':
for i in range(8):
mask = (1 << i)
if not (channel & mask):
continue
if disable:
channel_state |= mask
else:
channel_state &= ~mask
return self._write_eeprom_bytes(2048 + 130, 1, bytearray([channel_state]))
elif self.sfp_type == 'QSFP':
for i in range(4):
mask = (1 << i)
if not (channel & mask):
continue
if disable:
channel_state |= mask
else:
channel_state &= ~mask
return self._write_eeprom_bytes(86, 1, bytearray([channel_state]))
else:
channel_state = self.get_tx_disable_channel()
if channel_state is None or channel_state == "N/A":
return False
offset = 256
mask = (1 << 0)
if disable:
channel_state |= mask
else:
channel_state &= ~mask
return self._write_eeprom_bytes(offset + SFP_STATUS_CONTROL_OFFSET, SFP_STATUS_CONTROL_WIDTH, bytearray([channel_state]))
def set_power_override(self, power_override, power_set): def set_power_override(self, power_override, power_set):
""" """