sonic-buildimage/device/celestica/x86_64-cel_seastone-r0/sonic_platform/component.py

217 lines
7.2 KiB
Python
Raw Normal View History

#############################################################################
# Celestica
#
# Component contains an implementation of SONiC Platform Base API and
# provides the components firmware management function
#
#############################################################################
import os.path
import shutil
try:
from sonic_platform_base.component_base import ComponentBase
[sonic-utilities] Update submodule; Build and install as a Python 3 wheel (#5926) Submodule updates include the following commits: * src/sonic-utilities 9dc58ea...f9eb739 (18): > Remove unnecessary calls to str.encode() now that the package is Python 3; Fix deprecation warning (#1260) > [generate_dump] Ignoring file/directory not found Errors (#1201) > Fixed porstat rate and util issues (#1140) > fix error: interface counters is mismatch after warm-reboot (#1099) > Remove unnecessary calls to str.decode() now that the package is Python 3 (#1255) > [acl-loader] Make list sorting compliant with Python 3 (#1257) > Replace hard-coded fast-reboot with variable. And some typo corrections (#1254) > [configlet][portconfig] Remove calls to dict.has_key() which is not available in Python 3 (#1247) > Remove unnecessary conversions to list() and calls to dict.keys() (#1243) > Clean up LGTM alerts (#1239) > Add 'requests' as install dependency in setup.py (#1240) > Convert to Python 3 (#1128) > Fix mock SonicV2Connector in python3: use decode_responses mode so caller code will be the same as python2 (#1238) > [tests] Do not trim from PATH if we did not append to it; Clean up/fix shebangs in scripts (#1233) > Updates to bgp config and show commands with BGP_INTERNAL_NEIGHBOR table (#1224) > [cli]: NAT show commands newline issue after migrated to Python3 (#1204) > [doc]: Update Command-Reference.md (#1231) > Added 'import sys' in feature.py file (#1232) * src/sonic-py-swsssdk 9d9f0c6...1664be9 (2): > Fix: no need to decode() after redis client scan, so it will work for both python2 and python3 (#96) > FieldValueMap `contains`(`in`) will also work when migrated to libswsscommon(C++ with SWIG wrapper) (#94) - Also fix Python 3-related issues: - Use integer (floor) division in config_samples.py (sonic-config-engine) - Replace print statement with print function in eeprom.py plugin for x86_64-kvm_x86_64-r0 platform - Update all platform plugins to be compatible with both Python 2 and Python 3 - Remove shebangs from plugins files which are not intended to be executable - Replace tabs with spaces in Python plugin files and fix alignment, because Python 3 is more strict - Remove trailing whitespace from plugins files
2020-11-25 12:28:36 -06:00
from .helper import APIHelper
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
CPLD_ADDR_MAPPING = {
"CPLD1": "0x100",
"CPLD2": "0x200",
"CPLD3": "0x280",
"CPLD4": "0x300",
"CPLD5": "0x380"
}
GETREG_PATH = "/sys/devices/platform/dx010_cpld/getreg"
BIOS_VERSION_PATH = "/sys/class/dmi/id/bios_version"
COMPONENT_NAME_LIST = ["CPLD1", "CPLD2", "CPLD3", "CPLD4", "CPLD5", "BIOS"]
COMPONENT_DES_LIST = ["Used for managing the CPU",
"Used for managing QSFP+ ports (1-10)", "Used for managing QSFP+ ports (11-21)",
"Used for misc status and control", "Used for managing QSFP+ ports (22-32)",
"Basic Input/Output System"]
class Component(ComponentBase):
"""Platform-specific Component class"""
DEVICE_TYPE = "component"
def __init__(self, component_index):
ComponentBase.__init__(self)
self.index = component_index
self._api_helper = APIHelper()
self.name = self.get_name()
def __get_bios_version(self):
# Retrieves the BIOS firmware version
try:
with open(BIOS_VERSION_PATH, 'r') as fd:
bios_version = fd.read()
return bios_version.strip()
except Exception as e:
return None
def __get_cpld_version(self):
# Retrieves the CPLD firmware version
cpld_version = dict()
for cpld_name in CPLD_ADDR_MAPPING:
try:
cpld_addr = CPLD_ADDR_MAPPING[cpld_name]
fix possible cpld race access issue (#15371) Why I did it fix possible cpld race read issue between watchdog and reboot cause process How I did it Use fcntl.flock to limit parallel access to cpld sys file How to verify it It can be simulated and verified with following python script ``` python3 import fcntl import signal import threading exit_flag = False def get_cpld_reg_value(getreg_path, register): file = open(getreg_path, 'w+') # Acquire an exclusive lock on the file fcntl.flock(file, fcntl.LOCK_EX) try: file.write(register + '\n') file.flush() # Seek to the beginning of the file file.seek(0) # Read the content of the file result = file.readline().strip() finally: # Release the lock and close the file fcntl.flock(file, fcntl.LOCK_UN) file.close() return result def cpld_read(thread_num, cpld_reg, expect_val): while not exit_flag: val = get_cpld_reg_value("/sys/devices/platform/dx010_cpld/getreg", cpld_reg) #print(f"Thread {thread_num}: get cpld reg {cpld_reg}, value {val}") if val != expect_val: print(f"Thread {thread_num}: get cpld reg {cpld_reg}, value {val}, expect_val {expect_val}") def signal_handler(sig, frame): global exit_flag print("Ctrl+C detected. Quitting...") exit_flag = True if __name__ == '__main__': # Register the signal handler for Ctrl+C signal.signal(signal.SIGINT, signal_handler) t1 = threading.Thread(target=cpld_read, args=(1, '0x103', '0x11',)) t2 = threading.Thread(target=cpld_read, args=(2, '0x141', '0x00',)) t1.start() t2.start() t1.join() t2.join() ```
2023-06-07 13:29:18 -05:00
cpld_version_raw = self._api_helper.get_cpld_reg_value(GETREG_PATH, cpld_addr)
cpld_version_str = "{}.{}".format(int(cpld_version_raw[2], 16), int(
cpld_version_raw[3], 16)) if cpld_version_raw is not None else 'None'
cpld_version[cpld_name] = cpld_version_str
except Exception as e:
cpld_version[cpld_name] = 'None'
return cpld_version
def get_name(self):
"""
Retrieves the name of the component
Returns:
A string containing the name of the component
"""
return COMPONENT_NAME_LIST[self.index]
def get_description(self):
"""
Retrieves the description of the component
Returns:
A string containing the description of the component
"""
return COMPONENT_DES_LIST[self.index]
def get_firmware_version(self):
"""
Retrieves the firmware version of module
Returns:
string: The firmware versions of the module
"""
fw_version = None
if self.name == "BIOS":
fw_version = self.__get_bios_version()
elif "CPLD" in self.name:
cpld_version = self.__get_cpld_version()
fw_version = cpld_version.get(self.name)
return fw_version
def get_available_firmware_version(self, image_path):
"""
Retrieves the available firmware version of the component
Note: the firmware version will be read from image
Args:
image_path: A string, path to firmware image
Returns:
A string containing the available firmware version of the component
"""
return "N/A"
def get_firmware_update_notification(self, image_path):
"""
Retrieves a notification on what should be done in order to complete
the component firmware update
Args:
image_path: A string, path to firmware image
Returns:
A string containing the component firmware update notification if required.
By default 'None' value will be used, which indicates that no actions are required
"""
return "None"
def install_firmware(self, image_path):
"""
Install firmware to module
Args:
image_path: A string, path to firmware image
Returns:
A boolean, True if install successfully, False if not
"""
if not os.path.isfile(image_path):
return False
if "CPLD" in self.name:
img_name = os.path.basename(image_path)
root, ext = os.path.splitext(img_name)
ext = ".vme" if ext == "" else ext
new_image_path = os.path.join("/tmp", (root.lower() + ext))
shutil.copy(image_path, new_image_path)
install_command = ["ispvm", str(new_image_path)]
# elif self.name == "BIOS":
# install_command = "afulnx_64 %s /p /b /n /x /r" % image_path
return self._api_helper.run_command(install_command)
def update_firmware(self, image_path):
"""
Updates firmware of the component
This API performs firmware update: it assumes firmware installation and loading in a single call.
In case platform component requires some extra steps (apart from calling Low Level Utility)
to load the installed firmware (e.g, reboot, power cycle, etc.) - this will be done automatically by API
Args:
image_path: A string, path to firmware image
Raises:
RuntimeError: update failed
"""
return False
##############################################################
###################### Device methods ########################
##############################################################
def get_presence(self):
"""
Retrieves the presence of the FAN
Returns:
bool: True if FAN is present, False if not
"""
return True
def get_model(self):
"""
Retrieves the model number (or part number) of the device
Returns:
string: Model/part number of device
"""
return 'N/A'
def get_serial(self):
"""
Retrieves the serial number of the device
Returns:
string: Serial number of device
"""
return 'N/A'
def get_status(self):
"""
Retrieves the operational status of the device
Returns:
A boolean value, True if device is operating properly, False if not
"""
return True
def get_position_in_parent(self):
"""
Retrieves 1-based relative physical position in parent device.
If the agent cannot determine the parent-relative position
for some reason, or if the associated value of
entPhysicalContainedIn is'0', then the value '-1' is returned
Returns:
integer: The 1-based relative physical position in parent device
or -1 if cannot determine the position
"""
return -1
def is_replaceable(self):
"""
Indicate whether this device is replaceable.
Returns:
bool: True if it is replaceable.
"""
return False