sonic-buildimage/platform/mellanox/mlnx-platform-api/sonic_platform/component.py
Alexander Allen 29601366ee
[Mellanox] Implement auto_firmware_update platform API for to support fwutil auto-update (#7721)
Why I did it
The Mellanox platform is required to support the fwutil auto-update feature defined here

This is to allow switches, when performing SONiC upgrades to choose whether to perform firmware upgrades that may interrupt the data plane through a cold boot.

How I did it
Two methods were added to the component implementations for mellanox.

In the base Component class we add a default function that chooses to skip the installation of any firmware unless the cold boot option is provided. This is because the Mellanox platform, by default, does not support installing firmware on ONIE, the CPLD, or the BIOS "on-the-fly".

In the ComponentSSD class we add a function that behaves similarly but uses the Mellanox specific SSD firmware upgrade tool to check if the current SSD supports being upgraded on the fly in order to decide whether to skip or perform the installation.

How to verify it
Unit tests are included with this PR. These test will run on build of target sonic-mellanox.bin

You may also perform fwutil auto-update ... commands after Azure/sonic-utilities#1242 is merged in.
2021-06-16 14:55:20 -07:00

813 lines
30 KiB
Python

#############################################################################
# Mellanox
#
# implementation of new platform api
#############################################################################
try:
import os
import io
import re
import sys
import glob
import tempfile
import subprocess
if sys.version_info[0] > 2:
import configparser
else:
import ConfigParser as configparser
from sonic_platform_base.component_base import ComponentBase, \
FW_AUTO_INSTALLED, \
FW_AUTO_ERR_BOOT_TYPE, \
FW_AUTO_ERR_IMAGE, \
FW_AUTO_ERR_UKNOWN
except ImportError as e:
raise ImportError(str(e) + "- required module not found")
class MPFAManager(object):
MPFA_EXTENSION = '.mpfa'
MPFA_EXTRACT_COMMAND = 'tar xzf {} -C {}'
MPFA_CLEANUP_COMMAND = 'rm -rf {}'
def __init__(self, mpfa_path):
self.__mpfa_path = mpfa_path
self.__contents_path = None
self.__metadata = None
def __enter__(self):
self.extract()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def __validate_path(self, mpfa_path):
if not os.path.isfile(mpfa_path):
raise RuntimeError("MPFA doesn't exist: path={}".format(mpfa_path))
name, ext = os.path.splitext(mpfa_path)
if ext != self.MPFA_EXTENSION:
raise RuntimeError("MPFA doesn't have valid extension: path={}".format(mpfa_path))
def __extract_contents(self, mpfa_path):
contents_path = tempfile.mkdtemp(prefix='mpfa-')
cmd = self.MPFA_EXTRACT_COMMAND.format(mpfa_path, contents_path)
subprocess.check_call(cmd.split(), universal_newlines=True)
self.__contents_path = contents_path
def __parse_metadata(self, contents_path):
metadata_path = os.path.join(contents_path, 'metadata.ini')
if not os.path.isfile(metadata_path):
raise RuntimeError("MPFA metadata doesn't exist: path={}".format(metadata_path))
cp = configparser.ConfigParser()
with io.open(metadata_path, 'r') as metadata_ini:
cp.readfp(metadata_ini)
self.__metadata = cp
def extract(self):
if self.is_extracted():
return
self.__validate_path(self.__mpfa_path)
self.__extract_contents(self.__mpfa_path)
self.__parse_metadata(self.__contents_path)
def cleanup(self):
if os.path.exists(self.__contents_path):
cmd = self.MPFA_CLEANUP_COMMAND.format(self.__contents_path)
subprocess.check_call(cmd.split(), universal_newlines=True)
self.__contents_path = None
self.__metadata = None
def get_path(self):
return self.__contents_path
def get_metadata(self):
return self.__metadata
def is_extracted(self):
return self.__contents_path is not None and os.path.exists(self.__contents_path)
class ONIEUpdater(object):
ONIE_FW_UPDATE_CMD_ADD = '/usr/bin/mlnx-onie-fw-update.sh add {}'
ONIE_FW_UPDATE_CMD_REMOVE = '/usr/bin/mlnx-onie-fw-update.sh remove {}'
ONIE_FW_UPDATE_CMD_UPDATE = '/usr/bin/mlnx-onie-fw-update.sh update'
ONIE_FW_UPDATE_CMD_SHOW_PENDING = '/usr/bin/mlnx-onie-fw-update.sh show-pending'
ONIE_VERSION_PARSE_PATTERN = '([0-9]{4})\.([0-9]{2})-([0-9]+)\.([0-9]+)\.([0-9]+)-([0-9]+)'
ONIE_VERSION_BASE_PARSE_PATTERN = '([0-9]+)\.([0-9]+)\.([0-9]+)'
ONIE_VERSION_REQUIRED = '5.2.0016'
ONIE_VERSION_ATTR = 'onie_version'
ONIE_NO_PENDING_UPDATES_ATTR = 'No pending firmware updates present'
ONIE_IMAGE_INFO_COMMAND = '/bin/bash {} -q -i'
def __mount_onie_fs(self):
fs_mountpoint = '/mnt/onie-fs'
onie_path = '/lib/onie'
if os.path.lexists(onie_path) or os.path.exists(fs_mountpoint):
self.__umount_onie_fs()
cmd = "fdisk -l | grep 'ONIE boot' | awk '{print $1}'"
fs_path = subprocess.check_output(cmd,
stderr=subprocess.STDOUT,
shell=True,
universal_newlines=True).rstrip('\n')
os.mkdir(fs_mountpoint)
cmd = "mount -n -r -t ext4 {} {}".format(fs_path, fs_mountpoint)
subprocess.check_call(cmd, shell=True, universal_newlines=True)
fs_onie_path = os.path.join(fs_mountpoint, 'onie/tools/lib/onie')
os.symlink(fs_onie_path, onie_path)
return fs_mountpoint
def __umount_onie_fs(self):
fs_mountpoint = '/mnt/onie-fs'
onie_path = '/lib/onie'
if os.path.islink(onie_path):
os.unlink(onie_path)
if os.path.ismount(fs_mountpoint):
cmd = "umount -rf {}".format(fs_mountpoint)
subprocess.check_call(cmd, shell=True, universal_newlines=True)
if os.path.exists(fs_mountpoint):
os.rmdir(fs_mountpoint)
def __stage_update(self, image_path):
cmd = self.ONIE_FW_UPDATE_CMD_ADD.format(image_path)
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to stage firmware update: {}".format(str(e)))
def __unstage_update(self, image_path):
cmd = self.ONIE_FW_UPDATE_CMD_REMOVE.format(os.path.basename(image_path))
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to unstage firmware update: {}".format(str(e)))
def __trigger_update(self):
cmd = self.ONIE_FW_UPDATE_CMD_UPDATE
try:
subprocess.check_call(cmd.split(), universal_newlines=True)
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to trigger firmware update: {}".format(str(e)))
def __is_update_staged(self, image_path):
cmd = self.ONIE_FW_UPDATE_CMD_SHOW_PENDING
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get pending firmware updates: {}".format(str(e)))
basename = os.path.basename(image_path)
for line in output.splitlines():
if line.startswith(basename):
return True
return False
def parse_onie_version(self, version, is_base=False):
onie_year = None
onie_month = None
onie_major = None
onie_minor = None
onie_release = None
onie_baudrate = None
if is_base:
pattern = self.ONIE_VERSION_BASE_PARSE_PATTERN
m = re.search(pattern, version)
if not m:
raise RuntimeError("Failed to parse ONIE version: pattern={}, version={}".format(pattern, version))
onie_major = m.group(1)
onie_minor = m.group(2)
onie_release = m.group(3)
return onie_year, onie_month, onie_major, onie_minor, onie_release, onie_baudrate
pattern = self.ONIE_VERSION_PARSE_PATTERN
m = re.search(pattern, version)
if not m:
raise RuntimeError("Failed to parse ONIE version: pattern={}, version={}".format(pattern, version))
onie_year = m.group(1)
onie_month = m.group(2)
onie_major = m.group(3)
onie_minor = m.group(4)
onie_release = m.group(5)
onie_baudrate = m.group(6)
return onie_year, onie_month, onie_major, onie_minor, onie_release, onie_baudrate
def get_onie_required_version(self):
return self.ONIE_VERSION_REQUIRED
def get_onie_version(self):
version = None
try:
fs_mountpoint = self.__mount_onie_fs()
machine_conf_path = os.path.join(fs_mountpoint, 'onie/grub/grub-machine.cfg')
with open(machine_conf_path, 'r') as machine_conf:
for line in machine_conf:
if line.startswith(self.ONIE_VERSION_ATTR):
items = line.rstrip('\n').split('=')
if len(items) != 2:
raise RuntimeError("Failed to parse ONIE info: line={}".format(line))
version = items[1]
break
if version is None:
raise RuntimeError("Failed to parse ONIE version")
finally:
self.__umount_onie_fs()
return version
def get_onie_firmware_info(self, image_path):
firmware_info = { }
try:
self.__mount_onie_fs()
cmd = self.ONIE_IMAGE_INFO_COMMAND.format(image_path)
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get ONIE firmware info: {}".format(str(e)))
for line in output.splitlines():
items = line.split('=')
if len(items) != 2:
raise RuntimeError("Failed to parse ONIE firmware info: line={}".format(line))
firmware_info[items[0]] = items[1]
finally:
self.__umount_onie_fs()
return firmware_info
def update_firmware(self, image_path):
cmd = self.ONIE_FW_UPDATE_CMD_SHOW_PENDING
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get pending firmware updates: {}".format(str(e)))
no_pending_updates = False
for line in output.splitlines():
if line.startswith(self.ONIE_NO_PENDING_UPDATES_ATTR):
no_pending_updates = True
break
if not no_pending_updates:
raise RuntimeError("Failed to complete firmware update: pending updates are present")
try:
self.__stage_update(image_path)
self.__trigger_update()
except:
if self.__is_update_staged(image_path):
self.__unstage_update(image_path)
raise
def is_non_onie_firmware_update_supported(self):
current_version = self.get_onie_version()
_, _, major1, minor1, release1, _ = self.parse_onie_version(current_version)
version1 = int("{}{}{}".format(major1, minor1, release1))
required_version = self.get_onie_required_version()
_, _, major2, minor2, release2, _ = self.parse_onie_version(required_version, True)
version2 = int("{}{}{}".format(major2, minor2, release2))
return version1 >= version2
class Component(ComponentBase):
def __init__(self):
super(Component, self).__init__()
self.name = None
self.description = None
self.image_ext_name = None
def get_name(self):
return self.name
def get_description(self):
return self.description
def auto_update_firmware(self, image_path, boot_action):
"""
Default handling of attempted automatic update for a component of a Mellanox switch.
Will skip the installation if the boot_action is 'warm' or 'fast' and will call update_firmware()
if boot_action is fast.
"""
default_supported_boot = ['cold']
# Verify image path exists
if not os.path.exists(image_path):
# Invalid image path
return FW_AUTO_ERR_IMAGE
if boot_action in default_supported_boot:
if self.install_firmware(image_path):
# Successful update
return FW_AUTO_INSTALLED
# Failed update (unknown reason)
return FW_AUTO_ERR_UKNOWN
# boot_type did not match (skip)
return FW_AUTO_ERR_BOOT_TYPE
@staticmethod
def _read_generic_file(filename, len, ignore_errors=False):
"""
Read a generic file, returns the contents of the file
"""
result = None
try:
with io.open(filename, 'r') as fileobj:
result = fileobj.read(len)
except IOError as e:
if not ignore_errors:
raise RuntimeError("Failed to read file {} due to {}".format(filename, repr(e)))
return result
@staticmethod
def _get_command_result(cmdline):
try:
proc = subprocess.Popen(cmdline,
stdout=subprocess.PIPE,
shell=True,
stderr=subprocess.STDOUT,
universal_newlines=True)
stdout = proc.communicate()[0]
rc = proc.wait()
result = stdout.rstrip('\n')
if rc != 0:
raise RuntimeError("Failed to execute command {}, return code {}, message {}".format(cmdline, rc, stdout))
except OSError as e:
raise RuntimeError("Failed to execute command {} due to {}".format(cmdline, repr(e)))
return result
def _check_file_validity(self, image_path):
if not os.path.isfile(image_path):
print("ERROR: File {} doesn't exist or is not a file".format(image_path))
return False
name_list = os.path.splitext(image_path)
if self.image_ext_name is not None:
if name_list[1] != self.image_ext_name:
print("ERROR: Extend name of file {} is wrong. Image for {} should have extend name {}".format(image_path, self.name, self.image_ext_name))
return False
else:
if name_list[1]:
print("ERROR: Extend name of file {} is wrong. Image for {} shouldn't have extension".format(image_path, self.name))
return False
return True
class ComponentONIE(Component):
COMPONENT_NAME = 'ONIE'
COMPONENT_DESCRIPTION = 'ONIE - Open Network Install Environment'
ONIE_IMAGE_VERSION_ATTR = 'image_version'
def __init__(self):
super(ComponentONIE, self).__init__()
self.name = self.COMPONENT_NAME
self.description = self.COMPONENT_DESCRIPTION
self.onie_updater = ONIEUpdater()
def __install_firmware(self, image_path):
if not self._check_file_validity(image_path):
return False
try:
print("INFO: Staging {} firmware update with ONIE updater".format(self.name))
self.onie_updater.update_firmware(image_path)
except Exception as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
return True
def get_firmware_version(self):
return self.onie_updater.get_onie_version()
def get_available_firmware_version(self, image_path):
firmware_info = self.onie_updater.get_onie_firmware_info(image_path)
if self.ONIE_IMAGE_VERSION_ATTR not in firmware_info:
raise RuntimeError("Failed to get {} available firmware version".format(self.name))
return firmware_info[self.ONIE_IMAGE_VERSION_ATTR]
def get_firmware_update_notification(self, image_path):
return "Immediate cold reboot is required to complete {} firmware update".format(self.name)
def install_firmware(self, image_path):
return self.__install_firmware(image_path)
def update_firmware(self, image_path):
self.__install_firmware(image_path)
class ComponentSSD(Component):
COMPONENT_NAME = 'SSD'
COMPONENT_DESCRIPTION = 'SSD - Solid-State Drive'
COMPONENT_FIRMWARE_EXTENSION = '.pkg'
FIRMWARE_VERSION_ATTR = 'Firmware Version'
AVAILABLE_FIRMWARE_VERSION_ATTR = 'Available Firmware Version'
POWER_CYCLE_REQUIRED_ATTR = 'Power Cycle Required'
UPGRADE_REQUIRED_ATTR = 'Upgrade Required'
SSD_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q"
SSD_FIRMWARE_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q -i {}"
SSD_FIRMWARE_UPDATE_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -y -u -i {}"
def __init__(self):
super(ComponentSSD, self).__init__()
self.name = self.COMPONENT_NAME
self.description = self.COMPONENT_DESCRIPTION
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
def __install_firmware(self, image_path):
if not self._check_file_validity(image_path):
return False
cmd = self.SSD_FIRMWARE_UPDATE_COMMAND.format(image_path)
try:
print("INFO: Installing {} firmware update".format(self.name))
subprocess.check_call(cmd.split(), universal_newlines=True)
except subprocess.CalledProcessError as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
return True
def auto_update_firmware(self, image_path, boot_action):
"""
Handling of attempted automatic update for a SSD of a Mellanox switch.
Will first check the image_path to determine if a post-install reboot is required,
then compares it against boot_action to determine whether to proceed with install.
"""
# All devices support cold boot
supported_boot = ['cold']
# Verify image path exists
if not os.path.exists(image_path):
# Invalid image path
return FW_AUTO_ERR_IMAGE
# Check if post_install reboot is required
try:
if self.get_firmware_update_notification(image_path) is None:
# No power cycle required
supported_boot += ['warm', 'fast', 'none', 'any']
except RuntimeError:
# Unknown error from firmware probe
return FW_AUTO_ERR_UKNOWN
if boot_action in supported_boot:
if self.install_firmware(image_path):
# Successful update
return FW_AUTO_INSTALLED
# Failed update (unknown reason)
return FW_AUTO_ERR_UKNOWN
# boot_type did not match (skip)
return FW_AUTO_ERR_BOOT_TYPE
def get_firmware_version(self):
cmd = self.SSD_INFO_COMMAND
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get {} info: {}".format(self.name, str(e)))
for line in output.splitlines():
if line.startswith(self.FIRMWARE_VERSION_ATTR):
return line.split(':')[1].lstrip(' \t')
raise RuntimeError("Failed to parse {} version".format(self.name))
def get_available_firmware_version(self, image_path):
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get {} firmware info: {}".format(self.name, str(e)))
current_firmware_version = None
available_firmware_version = None
upgrade_required = None
for line in output.splitlines():
if line.startswith(self.FIRMWARE_VERSION_ATTR):
current_firmware_version = line.split(':')[1].lstrip(' \t')
if line.startswith(self.AVAILABLE_FIRMWARE_VERSION_ATTR):
available_firmware_version = line.split(':')[1].lstrip(' \t')
if line.startswith(self.UPGRADE_REQUIRED_ATTR):
upgrade_required = line.split(':')[1].lstrip(' \t')
if upgrade_required is None or upgrade_required not in ['yes', 'no']:
raise RuntimeError("Failed to parse {} firmware upgrade status".format(self.name))
if upgrade_required == 'no':
if current_firmware_version is None:
raise RuntimeError("Failed to parse {} current firmware version".format(self.name))
return current_firmware_version
if available_firmware_version is None:
raise RuntimeError("Failed to parse {} available firmware version".format(self.name))
return available_firmware_version
def get_firmware_update_notification(self, image_path):
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
try:
output = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get {} firmware info: {}".format(self.name, str(e)))
power_cycle_required = None
upgrade_required = None
for line in output.splitlines():
if line.startswith(self.POWER_CYCLE_REQUIRED_ATTR):
power_cycle_required = line.split(':')[1].lstrip(' \t')
if line.startswith(self.UPGRADE_REQUIRED_ATTR):
upgrade_required = line.split(':')[1].lstrip(' \t')
if upgrade_required is None or upgrade_required not in ['yes', 'no']:
raise RuntimeError("Failed to parse {} firmware upgrade status".format(self.name))
if upgrade_required == 'no':
return None
if power_cycle_required is None or power_cycle_required not in ['yes', 'no']:
raise RuntimeError("Failed to parse {} firmware power policy".format(self.name))
notification = None
if power_cycle_required == 'yes':
notification = "Immediate power cycle is required to complete {} firmware update".format(self.name)
return notification
def install_firmware(self, image_path):
return self.__install_firmware(image_path)
def update_firmware(self, image_path):
self.__install_firmware(image_path)
class ComponentBIOS(Component):
COMPONENT_NAME = 'BIOS'
COMPONENT_DESCRIPTION = 'BIOS - Basic Input/Output System'
COMPONENT_FIRMWARE_EXTENSION = '.rom'
BIOS_VERSION_COMMAND = 'dmidecode --oem-string 1'
def __init__(self):
super(ComponentBIOS, self).__init__()
self.name = self.COMPONENT_NAME
self.description = self.COMPONENT_DESCRIPTION
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
self.onie_updater = ONIEUpdater()
def __install_firmware(self, image_path):
if not self.onie_updater.is_non_onie_firmware_update_supported():
print("ERROR: ONIE {} or later is required".format(self.onie_updater.get_onie_required_version()))
return False
if not self._check_file_validity(image_path):
return False
try:
print("INFO: Staging {} firmware update with ONIE updater".format(self.name))
self.onie_updater.update_firmware(image_path)
except Exception as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
return True
def get_firmware_version(self):
cmd = self.BIOS_VERSION_COMMAND
try:
version = subprocess.check_output(cmd.split(),
stderr=subprocess.STDOUT,
universal_newlines=True).rstrip('\n')
except subprocess.CalledProcessError as e:
raise RuntimeError("Failed to get {} version: {}".format(self.name, str(e)))
return version
def get_available_firmware_version(self, image_path):
raise NotImplementedError("{} component doesn't support firmware version query".format(self.name))
def get_firmware_update_notification(self, image_path):
return "Immediate cold reboot is required to complete {} firmware update".format(self.name)
def install_firmware(self, image_path):
return self.__install_firmware(image_path)
def update_firmware(self, image_path):
self.__install_firmware(image_path)
class ComponentCPLD(Component):
COMPONENT_NAME = 'CPLD{}'
COMPONENT_DESCRIPTION = 'CPLD - Complex Programmable Logic Device'
COMPONENT_FIRMWARE_EXTENSION = '.vme'
MST_DEVICE_PATH = '/dev/mst'
MST_DEVICE_PATTERN = 'mt[0-9]*_pci_cr0'
CPLD_NUMBER_FILE = '/var/run/hw-management/config/cpld_num'
CPLD_PART_NUMBER_FILE = '/var/run/hw-management/system/cpld{}_pn'
CPLD_VERSION_FILE = '/var/run/hw-management/system/cpld{}_version'
CPLD_VERSION_MINOR_FILE = '/var/run/hw-management/system/cpld{}_version_min'
CPLD_NUMBER_MAX_LENGTH = 1
CPLD_PART_NUMBER_MAX_LENGTH = 6
CPLD_VERSION_MAX_LENGTH = 2
CPLD_VERSION_MINOR_MAX_LENGTH = 2
CPLD_PART_NUMBER_DEFAULT = '0'
CPLD_VERSION_MINOR_DEFAULT = '0'
CPLD_FIRMWARE_UPDATE_COMMAND = 'cpldupdate --dev {} --print-progress {}'
def __init__(self, idx):
super(ComponentCPLD, self).__init__()
self.idx = idx
self.name = self.COMPONENT_NAME.format(self.idx)
self.description = self.COMPONENT_DESCRIPTION
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
def __get_mst_device(self):
if not os.path.exists(self.MST_DEVICE_PATH):
print("ERROR: mst driver is not loaded")
return None
pattern = os.path.join(self.MST_DEVICE_PATH, self.MST_DEVICE_PATTERN)
mst_dev_list = glob.glob(pattern)
if not mst_dev_list or len(mst_dev_list) != 1:
devices = str(os.listdir(self.MST_DEVICE_PATH))
print("ERROR: Failed to get mst device: pattern={}, devices={}".format(pattern, devices))
return None
return mst_dev_list[0]
def __install_firmware(self, image_path):
if not self._check_file_validity(image_path):
return False
mst_dev = self.__get_mst_device()
if mst_dev is None:
return False
cmd = self.CPLD_FIRMWARE_UPDATE_COMMAND.format(mst_dev, image_path)
try:
print("INFO: Installing {} firmware update: path={}".format(self.name, image_path))
subprocess.check_call(cmd.split(), universal_newlines=True)
except subprocess.CalledProcessError as e:
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
return False
return True
def get_firmware_version(self):
part_number_file = self.CPLD_PART_NUMBER_FILE.format(self.idx)
version_file = self.CPLD_VERSION_FILE.format(self.idx)
version_minor_file = self.CPLD_VERSION_MINOR_FILE.format(self.idx)
part_number = self._read_generic_file(part_number_file, self.CPLD_PART_NUMBER_MAX_LENGTH, True)
version = self._read_generic_file(version_file, self.CPLD_VERSION_MAX_LENGTH)
version_minor = self._read_generic_file(version_minor_file, self.CPLD_VERSION_MINOR_MAX_LENGTH, True)
if part_number is None:
part_number = self.CPLD_PART_NUMBER_DEFAULT
if version_minor is None:
version_minor = self.CPLD_VERSION_MINOR_DEFAULT
part_number = part_number.rstrip('\n').zfill(self.CPLD_PART_NUMBER_MAX_LENGTH)
version = version.rstrip('\n').zfill(self.CPLD_VERSION_MAX_LENGTH)
version_minor = version_minor.rstrip('\n').zfill(self.CPLD_VERSION_MINOR_MAX_LENGTH)
return "CPLD{}_REV{}{}".format(part_number, version, version_minor)
def get_available_firmware_version(self, image_path):
with MPFAManager(image_path) as mpfa:
if not mpfa.get_metadata().has_option('version', self.name):
raise RuntimeError("Failed to get {} available firmware version".format(self.name))
return mpfa.get_metadata().get('version', self.name)
def get_firmware_update_notification(self, image_path):
name, ext = os.path.splitext(os.path.basename(image_path))
if ext == self.COMPONENT_FIRMWARE_EXTENSION:
return "Power cycle (with 30 sec delay) or refresh image is required to complete {} firmware update".format(self.name)
return "Immediate power cycle is required to complete {} firmware update".format(self.name)
def install_firmware(self, image_path):
return self.__install_firmware(image_path)
def update_firmware(self, image_path):
with MPFAManager(image_path) as mpfa:
if not mpfa.get_metadata().has_option('firmware', 'burn'):
raise RuntimeError("Failed to get {} burn firmware".format(self.name))
if not mpfa.get_metadata().has_option('firmware', 'refresh'):
raise RuntimeError("Failed to get {} refresh firmware".format(self.name))
burn_firmware = mpfa.get_metadata().get('firmware', 'burn')
refresh_firmware = mpfa.get_metadata().get('firmware', 'refresh')
print("INFO: Processing {} burn file: firmware install".format(self.name))
if not self.__install_firmware(os.path.join(mpfa.get_path(), burn_firmware)):
return
print("INFO: Processing {} refresh file: firmware update".format(self.name))
self.__install_firmware(os.path.join(mpfa.get_path(), refresh_firmware))
@classmethod
def get_component_list(cls):
component_list = [ ]
cpld_number = cls._read_generic_file(cls.CPLD_NUMBER_FILE, cls.CPLD_NUMBER_MAX_LENGTH)
cpld_number = cpld_number.rstrip('\n')
for cpld_idx in range(1, int(cpld_number) + 1):
component_list.append(cls(cpld_idx))
return component_list