29601366ee
Why I did it The Mellanox platform is required to support the fwutil auto-update feature defined here This is to allow switches, when performing SONiC upgrades to choose whether to perform firmware upgrades that may interrupt the data plane through a cold boot. How I did it Two methods were added to the component implementations for mellanox. In the base Component class we add a default function that chooses to skip the installation of any firmware unless the cold boot option is provided. This is because the Mellanox platform, by default, does not support installing firmware on ONIE, the CPLD, or the BIOS "on-the-fly". In the ComponentSSD class we add a function that behaves similarly but uses the Mellanox specific SSD firmware upgrade tool to check if the current SSD supports being upgraded on the fly in order to decide whether to skip or perform the installation. How to verify it Unit tests are included with this PR. These test will run on build of target sonic-mellanox.bin You may also perform fwutil auto-update ... commands after Azure/sonic-utilities#1242 is merged in.
813 lines
30 KiB
Python
813 lines
30 KiB
Python
#############################################################################
|
|
# Mellanox
|
|
#
|
|
# implementation of new platform api
|
|
#############################################################################
|
|
|
|
|
|
try:
|
|
import os
|
|
import io
|
|
import re
|
|
import sys
|
|
import glob
|
|
import tempfile
|
|
import subprocess
|
|
if sys.version_info[0] > 2:
|
|
import configparser
|
|
else:
|
|
import ConfigParser as configparser
|
|
|
|
from sonic_platform_base.component_base import ComponentBase, \
|
|
FW_AUTO_INSTALLED, \
|
|
FW_AUTO_ERR_BOOT_TYPE, \
|
|
FW_AUTO_ERR_IMAGE, \
|
|
FW_AUTO_ERR_UKNOWN
|
|
except ImportError as e:
|
|
raise ImportError(str(e) + "- required module not found")
|
|
|
|
|
|
class MPFAManager(object):
|
|
MPFA_EXTENSION = '.mpfa'
|
|
|
|
MPFA_EXTRACT_COMMAND = 'tar xzf {} -C {}'
|
|
MPFA_CLEANUP_COMMAND = 'rm -rf {}'
|
|
|
|
def __init__(self, mpfa_path):
|
|
self.__mpfa_path = mpfa_path
|
|
self.__contents_path = None
|
|
self.__metadata = None
|
|
|
|
def __enter__(self):
|
|
self.extract()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.cleanup()
|
|
|
|
def __validate_path(self, mpfa_path):
|
|
if not os.path.isfile(mpfa_path):
|
|
raise RuntimeError("MPFA doesn't exist: path={}".format(mpfa_path))
|
|
|
|
name, ext = os.path.splitext(mpfa_path)
|
|
if ext != self.MPFA_EXTENSION:
|
|
raise RuntimeError("MPFA doesn't have valid extension: path={}".format(mpfa_path))
|
|
|
|
def __extract_contents(self, mpfa_path):
|
|
contents_path = tempfile.mkdtemp(prefix='mpfa-')
|
|
|
|
cmd = self.MPFA_EXTRACT_COMMAND.format(mpfa_path, contents_path)
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
|
|
self.__contents_path = contents_path
|
|
|
|
def __parse_metadata(self, contents_path):
|
|
metadata_path = os.path.join(contents_path, 'metadata.ini')
|
|
|
|
if not os.path.isfile(metadata_path):
|
|
raise RuntimeError("MPFA metadata doesn't exist: path={}".format(metadata_path))
|
|
|
|
cp = configparser.ConfigParser()
|
|
with io.open(metadata_path, 'r') as metadata_ini:
|
|
cp.readfp(metadata_ini)
|
|
|
|
self.__metadata = cp
|
|
|
|
def extract(self):
|
|
if self.is_extracted():
|
|
return
|
|
|
|
self.__validate_path(self.__mpfa_path)
|
|
self.__extract_contents(self.__mpfa_path)
|
|
self.__parse_metadata(self.__contents_path)
|
|
|
|
def cleanup(self):
|
|
if os.path.exists(self.__contents_path):
|
|
cmd = self.MPFA_CLEANUP_COMMAND.format(self.__contents_path)
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
|
|
self.__contents_path = None
|
|
self.__metadata = None
|
|
|
|
def get_path(self):
|
|
return self.__contents_path
|
|
|
|
def get_metadata(self):
|
|
return self.__metadata
|
|
|
|
def is_extracted(self):
|
|
return self.__contents_path is not None and os.path.exists(self.__contents_path)
|
|
|
|
|
|
class ONIEUpdater(object):
|
|
ONIE_FW_UPDATE_CMD_ADD = '/usr/bin/mlnx-onie-fw-update.sh add {}'
|
|
ONIE_FW_UPDATE_CMD_REMOVE = '/usr/bin/mlnx-onie-fw-update.sh remove {}'
|
|
ONIE_FW_UPDATE_CMD_UPDATE = '/usr/bin/mlnx-onie-fw-update.sh update'
|
|
ONIE_FW_UPDATE_CMD_SHOW_PENDING = '/usr/bin/mlnx-onie-fw-update.sh show-pending'
|
|
|
|
ONIE_VERSION_PARSE_PATTERN = '([0-9]{4})\.([0-9]{2})-([0-9]+)\.([0-9]+)\.([0-9]+)-([0-9]+)'
|
|
ONIE_VERSION_BASE_PARSE_PATTERN = '([0-9]+)\.([0-9]+)\.([0-9]+)'
|
|
ONIE_VERSION_REQUIRED = '5.2.0016'
|
|
|
|
ONIE_VERSION_ATTR = 'onie_version'
|
|
ONIE_NO_PENDING_UPDATES_ATTR = 'No pending firmware updates present'
|
|
|
|
ONIE_IMAGE_INFO_COMMAND = '/bin/bash {} -q -i'
|
|
|
|
def __mount_onie_fs(self):
|
|
fs_mountpoint = '/mnt/onie-fs'
|
|
onie_path = '/lib/onie'
|
|
|
|
if os.path.lexists(onie_path) or os.path.exists(fs_mountpoint):
|
|
self.__umount_onie_fs()
|
|
|
|
cmd = "fdisk -l | grep 'ONIE boot' | awk '{print $1}'"
|
|
fs_path = subprocess.check_output(cmd,
|
|
stderr=subprocess.STDOUT,
|
|
shell=True,
|
|
universal_newlines=True).rstrip('\n')
|
|
|
|
os.mkdir(fs_mountpoint)
|
|
cmd = "mount -n -r -t ext4 {} {}".format(fs_path, fs_mountpoint)
|
|
subprocess.check_call(cmd, shell=True, universal_newlines=True)
|
|
|
|
fs_onie_path = os.path.join(fs_mountpoint, 'onie/tools/lib/onie')
|
|
os.symlink(fs_onie_path, onie_path)
|
|
|
|
return fs_mountpoint
|
|
|
|
def __umount_onie_fs(self):
|
|
fs_mountpoint = '/mnt/onie-fs'
|
|
onie_path = '/lib/onie'
|
|
|
|
if os.path.islink(onie_path):
|
|
os.unlink(onie_path)
|
|
|
|
if os.path.ismount(fs_mountpoint):
|
|
cmd = "umount -rf {}".format(fs_mountpoint)
|
|
subprocess.check_call(cmd, shell=True, universal_newlines=True)
|
|
|
|
if os.path.exists(fs_mountpoint):
|
|
os.rmdir(fs_mountpoint)
|
|
|
|
def __stage_update(self, image_path):
|
|
cmd = self.ONIE_FW_UPDATE_CMD_ADD.format(image_path)
|
|
|
|
try:
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to stage firmware update: {}".format(str(e)))
|
|
|
|
def __unstage_update(self, image_path):
|
|
cmd = self.ONIE_FW_UPDATE_CMD_REMOVE.format(os.path.basename(image_path))
|
|
|
|
try:
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to unstage firmware update: {}".format(str(e)))
|
|
|
|
def __trigger_update(self):
|
|
cmd = self.ONIE_FW_UPDATE_CMD_UPDATE
|
|
|
|
try:
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to trigger firmware update: {}".format(str(e)))
|
|
|
|
def __is_update_staged(self, image_path):
|
|
cmd = self.ONIE_FW_UPDATE_CMD_SHOW_PENDING
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get pending firmware updates: {}".format(str(e)))
|
|
|
|
basename = os.path.basename(image_path)
|
|
|
|
for line in output.splitlines():
|
|
if line.startswith(basename):
|
|
return True
|
|
|
|
return False
|
|
|
|
def parse_onie_version(self, version, is_base=False):
|
|
onie_year = None
|
|
onie_month = None
|
|
onie_major = None
|
|
onie_minor = None
|
|
onie_release = None
|
|
onie_baudrate = None
|
|
|
|
if is_base:
|
|
pattern = self.ONIE_VERSION_BASE_PARSE_PATTERN
|
|
|
|
m = re.search(pattern, version)
|
|
if not m:
|
|
raise RuntimeError("Failed to parse ONIE version: pattern={}, version={}".format(pattern, version))
|
|
|
|
onie_major = m.group(1)
|
|
onie_minor = m.group(2)
|
|
onie_release = m.group(3)
|
|
|
|
return onie_year, onie_month, onie_major, onie_minor, onie_release, onie_baudrate
|
|
|
|
pattern = self.ONIE_VERSION_PARSE_PATTERN
|
|
|
|
m = re.search(pattern, version)
|
|
if not m:
|
|
raise RuntimeError("Failed to parse ONIE version: pattern={}, version={}".format(pattern, version))
|
|
|
|
onie_year = m.group(1)
|
|
onie_month = m.group(2)
|
|
onie_major = m.group(3)
|
|
onie_minor = m.group(4)
|
|
onie_release = m.group(5)
|
|
onie_baudrate = m.group(6)
|
|
|
|
return onie_year, onie_month, onie_major, onie_minor, onie_release, onie_baudrate
|
|
|
|
def get_onie_required_version(self):
|
|
return self.ONIE_VERSION_REQUIRED
|
|
|
|
def get_onie_version(self):
|
|
version = None
|
|
|
|
try:
|
|
fs_mountpoint = self.__mount_onie_fs()
|
|
machine_conf_path = os.path.join(fs_mountpoint, 'onie/grub/grub-machine.cfg')
|
|
|
|
with open(machine_conf_path, 'r') as machine_conf:
|
|
for line in machine_conf:
|
|
if line.startswith(self.ONIE_VERSION_ATTR):
|
|
items = line.rstrip('\n').split('=')
|
|
|
|
if len(items) != 2:
|
|
raise RuntimeError("Failed to parse ONIE info: line={}".format(line))
|
|
|
|
version = items[1]
|
|
break
|
|
|
|
if version is None:
|
|
raise RuntimeError("Failed to parse ONIE version")
|
|
finally:
|
|
self.__umount_onie_fs()
|
|
|
|
return version
|
|
|
|
def get_onie_firmware_info(self, image_path):
|
|
firmware_info = { }
|
|
|
|
try:
|
|
self.__mount_onie_fs()
|
|
|
|
cmd = self.ONIE_IMAGE_INFO_COMMAND.format(image_path)
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get ONIE firmware info: {}".format(str(e)))
|
|
|
|
for line in output.splitlines():
|
|
items = line.split('=')
|
|
|
|
if len(items) != 2:
|
|
raise RuntimeError("Failed to parse ONIE firmware info: line={}".format(line))
|
|
|
|
firmware_info[items[0]] = items[1]
|
|
finally:
|
|
self.__umount_onie_fs()
|
|
|
|
return firmware_info
|
|
|
|
def update_firmware(self, image_path):
|
|
cmd = self.ONIE_FW_UPDATE_CMD_SHOW_PENDING
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get pending firmware updates: {}".format(str(e)))
|
|
|
|
no_pending_updates = False
|
|
|
|
for line in output.splitlines():
|
|
if line.startswith(self.ONIE_NO_PENDING_UPDATES_ATTR):
|
|
no_pending_updates = True
|
|
break
|
|
|
|
if not no_pending_updates:
|
|
raise RuntimeError("Failed to complete firmware update: pending updates are present")
|
|
|
|
try:
|
|
self.__stage_update(image_path)
|
|
self.__trigger_update()
|
|
except:
|
|
if self.__is_update_staged(image_path):
|
|
self.__unstage_update(image_path)
|
|
raise
|
|
|
|
def is_non_onie_firmware_update_supported(self):
|
|
current_version = self.get_onie_version()
|
|
_, _, major1, minor1, release1, _ = self.parse_onie_version(current_version)
|
|
version1 = int("{}{}{}".format(major1, minor1, release1))
|
|
|
|
required_version = self.get_onie_required_version()
|
|
_, _, major2, minor2, release2, _ = self.parse_onie_version(required_version, True)
|
|
version2 = int("{}{}{}".format(major2, minor2, release2))
|
|
|
|
return version1 >= version2
|
|
|
|
|
|
class Component(ComponentBase):
|
|
def __init__(self):
|
|
super(Component, self).__init__()
|
|
self.name = None
|
|
self.description = None
|
|
self.image_ext_name = None
|
|
|
|
def get_name(self):
|
|
return self.name
|
|
|
|
def get_description(self):
|
|
return self.description
|
|
|
|
def auto_update_firmware(self, image_path, boot_action):
|
|
"""
|
|
Default handling of attempted automatic update for a component of a Mellanox switch.
|
|
Will skip the installation if the boot_action is 'warm' or 'fast' and will call update_firmware()
|
|
if boot_action is fast.
|
|
"""
|
|
|
|
default_supported_boot = ['cold']
|
|
|
|
# Verify image path exists
|
|
if not os.path.exists(image_path):
|
|
# Invalid image path
|
|
return FW_AUTO_ERR_IMAGE
|
|
|
|
if boot_action in default_supported_boot:
|
|
if self.install_firmware(image_path):
|
|
# Successful update
|
|
return FW_AUTO_INSTALLED
|
|
# Failed update (unknown reason)
|
|
return FW_AUTO_ERR_UKNOWN
|
|
|
|
# boot_type did not match (skip)
|
|
return FW_AUTO_ERR_BOOT_TYPE
|
|
|
|
@staticmethod
|
|
def _read_generic_file(filename, len, ignore_errors=False):
|
|
"""
|
|
Read a generic file, returns the contents of the file
|
|
"""
|
|
result = None
|
|
|
|
try:
|
|
with io.open(filename, 'r') as fileobj:
|
|
result = fileobj.read(len)
|
|
except IOError as e:
|
|
if not ignore_errors:
|
|
raise RuntimeError("Failed to read file {} due to {}".format(filename, repr(e)))
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _get_command_result(cmdline):
|
|
try:
|
|
proc = subprocess.Popen(cmdline,
|
|
stdout=subprocess.PIPE,
|
|
shell=True,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True)
|
|
stdout = proc.communicate()[0]
|
|
rc = proc.wait()
|
|
result = stdout.rstrip('\n')
|
|
if rc != 0:
|
|
raise RuntimeError("Failed to execute command {}, return code {}, message {}".format(cmdline, rc, stdout))
|
|
|
|
except OSError as e:
|
|
raise RuntimeError("Failed to execute command {} due to {}".format(cmdline, repr(e)))
|
|
|
|
return result
|
|
|
|
def _check_file_validity(self, image_path):
|
|
if not os.path.isfile(image_path):
|
|
print("ERROR: File {} doesn't exist or is not a file".format(image_path))
|
|
return False
|
|
|
|
name_list = os.path.splitext(image_path)
|
|
if self.image_ext_name is not None:
|
|
if name_list[1] != self.image_ext_name:
|
|
print("ERROR: Extend name of file {} is wrong. Image for {} should have extend name {}".format(image_path, self.name, self.image_ext_name))
|
|
return False
|
|
else:
|
|
if name_list[1]:
|
|
print("ERROR: Extend name of file {} is wrong. Image for {} shouldn't have extension".format(image_path, self.name))
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class ComponentONIE(Component):
|
|
COMPONENT_NAME = 'ONIE'
|
|
COMPONENT_DESCRIPTION = 'ONIE - Open Network Install Environment'
|
|
|
|
ONIE_IMAGE_VERSION_ATTR = 'image_version'
|
|
|
|
def __init__(self):
|
|
super(ComponentONIE, self).__init__()
|
|
|
|
self.name = self.COMPONENT_NAME
|
|
self.description = self.COMPONENT_DESCRIPTION
|
|
self.onie_updater = ONIEUpdater()
|
|
|
|
def __install_firmware(self, image_path):
|
|
if not self._check_file_validity(image_path):
|
|
return False
|
|
|
|
try:
|
|
print("INFO: Staging {} firmware update with ONIE updater".format(self.name))
|
|
self.onie_updater.update_firmware(image_path)
|
|
except Exception as e:
|
|
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_firmware_version(self):
|
|
return self.onie_updater.get_onie_version()
|
|
|
|
def get_available_firmware_version(self, image_path):
|
|
firmware_info = self.onie_updater.get_onie_firmware_info(image_path)
|
|
if self.ONIE_IMAGE_VERSION_ATTR not in firmware_info:
|
|
raise RuntimeError("Failed to get {} available firmware version".format(self.name))
|
|
|
|
return firmware_info[self.ONIE_IMAGE_VERSION_ATTR]
|
|
|
|
def get_firmware_update_notification(self, image_path):
|
|
return "Immediate cold reboot is required to complete {} firmware update".format(self.name)
|
|
|
|
def install_firmware(self, image_path):
|
|
return self.__install_firmware(image_path)
|
|
|
|
def update_firmware(self, image_path):
|
|
self.__install_firmware(image_path)
|
|
|
|
|
|
class ComponentSSD(Component):
|
|
COMPONENT_NAME = 'SSD'
|
|
COMPONENT_DESCRIPTION = 'SSD - Solid-State Drive'
|
|
COMPONENT_FIRMWARE_EXTENSION = '.pkg'
|
|
|
|
FIRMWARE_VERSION_ATTR = 'Firmware Version'
|
|
AVAILABLE_FIRMWARE_VERSION_ATTR = 'Available Firmware Version'
|
|
POWER_CYCLE_REQUIRED_ATTR = 'Power Cycle Required'
|
|
UPGRADE_REQUIRED_ATTR = 'Upgrade Required'
|
|
|
|
SSD_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q"
|
|
SSD_FIRMWARE_INFO_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -q -i {}"
|
|
SSD_FIRMWARE_UPDATE_COMMAND = "/usr/bin/mlnx-ssd-fw-update.sh -y -u -i {}"
|
|
|
|
def __init__(self):
|
|
super(ComponentSSD, self).__init__()
|
|
|
|
self.name = self.COMPONENT_NAME
|
|
self.description = self.COMPONENT_DESCRIPTION
|
|
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
|
|
|
|
def __install_firmware(self, image_path):
|
|
if not self._check_file_validity(image_path):
|
|
return False
|
|
|
|
cmd = self.SSD_FIRMWARE_UPDATE_COMMAND.format(image_path)
|
|
|
|
try:
|
|
print("INFO: Installing {} firmware update".format(self.name))
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
|
|
return False
|
|
|
|
return True
|
|
|
|
def auto_update_firmware(self, image_path, boot_action):
|
|
"""
|
|
Handling of attempted automatic update for a SSD of a Mellanox switch.
|
|
Will first check the image_path to determine if a post-install reboot is required,
|
|
then compares it against boot_action to determine whether to proceed with install.
|
|
"""
|
|
|
|
# All devices support cold boot
|
|
supported_boot = ['cold']
|
|
|
|
# Verify image path exists
|
|
if not os.path.exists(image_path):
|
|
# Invalid image path
|
|
return FW_AUTO_ERR_IMAGE
|
|
|
|
# Check if post_install reboot is required
|
|
try:
|
|
if self.get_firmware_update_notification(image_path) is None:
|
|
# No power cycle required
|
|
supported_boot += ['warm', 'fast', 'none', 'any']
|
|
except RuntimeError:
|
|
# Unknown error from firmware probe
|
|
return FW_AUTO_ERR_UKNOWN
|
|
|
|
if boot_action in supported_boot:
|
|
if self.install_firmware(image_path):
|
|
# Successful update
|
|
return FW_AUTO_INSTALLED
|
|
# Failed update (unknown reason)
|
|
return FW_AUTO_ERR_UKNOWN
|
|
|
|
# boot_type did not match (skip)
|
|
return FW_AUTO_ERR_BOOT_TYPE
|
|
|
|
def get_firmware_version(self):
|
|
cmd = self.SSD_INFO_COMMAND
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get {} info: {}".format(self.name, str(e)))
|
|
|
|
for line in output.splitlines():
|
|
if line.startswith(self.FIRMWARE_VERSION_ATTR):
|
|
return line.split(':')[1].lstrip(' \t')
|
|
|
|
raise RuntimeError("Failed to parse {} version".format(self.name))
|
|
|
|
def get_available_firmware_version(self, image_path):
|
|
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get {} firmware info: {}".format(self.name, str(e)))
|
|
|
|
current_firmware_version = None
|
|
available_firmware_version = None
|
|
upgrade_required = None
|
|
|
|
for line in output.splitlines():
|
|
if line.startswith(self.FIRMWARE_VERSION_ATTR):
|
|
current_firmware_version = line.split(':')[1].lstrip(' \t')
|
|
if line.startswith(self.AVAILABLE_FIRMWARE_VERSION_ATTR):
|
|
available_firmware_version = line.split(':')[1].lstrip(' \t')
|
|
if line.startswith(self.UPGRADE_REQUIRED_ATTR):
|
|
upgrade_required = line.split(':')[1].lstrip(' \t')
|
|
|
|
if upgrade_required is None or upgrade_required not in ['yes', 'no']:
|
|
raise RuntimeError("Failed to parse {} firmware upgrade status".format(self.name))
|
|
|
|
if upgrade_required == 'no':
|
|
if current_firmware_version is None:
|
|
raise RuntimeError("Failed to parse {} current firmware version".format(self.name))
|
|
return current_firmware_version
|
|
|
|
if available_firmware_version is None:
|
|
raise RuntimeError("Failed to parse {} available firmware version".format(self.name))
|
|
|
|
return available_firmware_version
|
|
|
|
def get_firmware_update_notification(self, image_path):
|
|
cmd = self.SSD_FIRMWARE_INFO_COMMAND.format(image_path)
|
|
|
|
try:
|
|
output = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get {} firmware info: {}".format(self.name, str(e)))
|
|
|
|
power_cycle_required = None
|
|
upgrade_required = None
|
|
|
|
for line in output.splitlines():
|
|
if line.startswith(self.POWER_CYCLE_REQUIRED_ATTR):
|
|
power_cycle_required = line.split(':')[1].lstrip(' \t')
|
|
if line.startswith(self.UPGRADE_REQUIRED_ATTR):
|
|
upgrade_required = line.split(':')[1].lstrip(' \t')
|
|
|
|
if upgrade_required is None or upgrade_required not in ['yes', 'no']:
|
|
raise RuntimeError("Failed to parse {} firmware upgrade status".format(self.name))
|
|
|
|
if upgrade_required == 'no':
|
|
return None
|
|
|
|
if power_cycle_required is None or power_cycle_required not in ['yes', 'no']:
|
|
raise RuntimeError("Failed to parse {} firmware power policy".format(self.name))
|
|
|
|
notification = None
|
|
|
|
if power_cycle_required == 'yes':
|
|
notification = "Immediate power cycle is required to complete {} firmware update".format(self.name)
|
|
|
|
return notification
|
|
|
|
def install_firmware(self, image_path):
|
|
return self.__install_firmware(image_path)
|
|
|
|
def update_firmware(self, image_path):
|
|
self.__install_firmware(image_path)
|
|
|
|
|
|
class ComponentBIOS(Component):
|
|
COMPONENT_NAME = 'BIOS'
|
|
COMPONENT_DESCRIPTION = 'BIOS - Basic Input/Output System'
|
|
COMPONENT_FIRMWARE_EXTENSION = '.rom'
|
|
|
|
BIOS_VERSION_COMMAND = 'dmidecode --oem-string 1'
|
|
|
|
def __init__(self):
|
|
super(ComponentBIOS, self).__init__()
|
|
|
|
self.name = self.COMPONENT_NAME
|
|
self.description = self.COMPONENT_DESCRIPTION
|
|
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
|
|
self.onie_updater = ONIEUpdater()
|
|
|
|
def __install_firmware(self, image_path):
|
|
if not self.onie_updater.is_non_onie_firmware_update_supported():
|
|
print("ERROR: ONIE {} or later is required".format(self.onie_updater.get_onie_required_version()))
|
|
return False
|
|
|
|
if not self._check_file_validity(image_path):
|
|
return False
|
|
|
|
try:
|
|
print("INFO: Staging {} firmware update with ONIE updater".format(self.name))
|
|
self.onie_updater.update_firmware(image_path)
|
|
except Exception as e:
|
|
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_firmware_version(self):
|
|
cmd = self.BIOS_VERSION_COMMAND
|
|
|
|
try:
|
|
version = subprocess.check_output(cmd.split(),
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True).rstrip('\n')
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError("Failed to get {} version: {}".format(self.name, str(e)))
|
|
|
|
return version
|
|
|
|
def get_available_firmware_version(self, image_path):
|
|
raise NotImplementedError("{} component doesn't support firmware version query".format(self.name))
|
|
|
|
def get_firmware_update_notification(self, image_path):
|
|
return "Immediate cold reboot is required to complete {} firmware update".format(self.name)
|
|
|
|
def install_firmware(self, image_path):
|
|
return self.__install_firmware(image_path)
|
|
|
|
def update_firmware(self, image_path):
|
|
self.__install_firmware(image_path)
|
|
|
|
|
|
class ComponentCPLD(Component):
|
|
COMPONENT_NAME = 'CPLD{}'
|
|
COMPONENT_DESCRIPTION = 'CPLD - Complex Programmable Logic Device'
|
|
COMPONENT_FIRMWARE_EXTENSION = '.vme'
|
|
|
|
MST_DEVICE_PATH = '/dev/mst'
|
|
MST_DEVICE_PATTERN = 'mt[0-9]*_pci_cr0'
|
|
|
|
CPLD_NUMBER_FILE = '/var/run/hw-management/config/cpld_num'
|
|
CPLD_PART_NUMBER_FILE = '/var/run/hw-management/system/cpld{}_pn'
|
|
CPLD_VERSION_FILE = '/var/run/hw-management/system/cpld{}_version'
|
|
CPLD_VERSION_MINOR_FILE = '/var/run/hw-management/system/cpld{}_version_min'
|
|
|
|
CPLD_NUMBER_MAX_LENGTH = 1
|
|
CPLD_PART_NUMBER_MAX_LENGTH = 6
|
|
CPLD_VERSION_MAX_LENGTH = 2
|
|
CPLD_VERSION_MINOR_MAX_LENGTH = 2
|
|
|
|
CPLD_PART_NUMBER_DEFAULT = '0'
|
|
CPLD_VERSION_MINOR_DEFAULT = '0'
|
|
|
|
CPLD_FIRMWARE_UPDATE_COMMAND = 'cpldupdate --dev {} --print-progress {}'
|
|
|
|
def __init__(self, idx):
|
|
super(ComponentCPLD, self).__init__()
|
|
|
|
self.idx = idx
|
|
self.name = self.COMPONENT_NAME.format(self.idx)
|
|
self.description = self.COMPONENT_DESCRIPTION
|
|
self.image_ext_name = self.COMPONENT_FIRMWARE_EXTENSION
|
|
|
|
def __get_mst_device(self):
|
|
if not os.path.exists(self.MST_DEVICE_PATH):
|
|
print("ERROR: mst driver is not loaded")
|
|
return None
|
|
|
|
pattern = os.path.join(self.MST_DEVICE_PATH, self.MST_DEVICE_PATTERN)
|
|
|
|
mst_dev_list = glob.glob(pattern)
|
|
if not mst_dev_list or len(mst_dev_list) != 1:
|
|
devices = str(os.listdir(self.MST_DEVICE_PATH))
|
|
print("ERROR: Failed to get mst device: pattern={}, devices={}".format(pattern, devices))
|
|
return None
|
|
|
|
return mst_dev_list[0]
|
|
|
|
def __install_firmware(self, image_path):
|
|
if not self._check_file_validity(image_path):
|
|
return False
|
|
|
|
mst_dev = self.__get_mst_device()
|
|
if mst_dev is None:
|
|
return False
|
|
|
|
cmd = self.CPLD_FIRMWARE_UPDATE_COMMAND.format(mst_dev, image_path)
|
|
|
|
try:
|
|
print("INFO: Installing {} firmware update: path={}".format(self.name, image_path))
|
|
subprocess.check_call(cmd.split(), universal_newlines=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print("ERROR: Failed to update {} firmware: {}".format(self.name, str(e)))
|
|
return False
|
|
|
|
return True
|
|
|
|
def get_firmware_version(self):
|
|
part_number_file = self.CPLD_PART_NUMBER_FILE.format(self.idx)
|
|
version_file = self.CPLD_VERSION_FILE.format(self.idx)
|
|
version_minor_file = self.CPLD_VERSION_MINOR_FILE.format(self.idx)
|
|
|
|
part_number = self._read_generic_file(part_number_file, self.CPLD_PART_NUMBER_MAX_LENGTH, True)
|
|
version = self._read_generic_file(version_file, self.CPLD_VERSION_MAX_LENGTH)
|
|
version_minor = self._read_generic_file(version_minor_file, self.CPLD_VERSION_MINOR_MAX_LENGTH, True)
|
|
|
|
if part_number is None:
|
|
part_number = self.CPLD_PART_NUMBER_DEFAULT
|
|
|
|
if version_minor is None:
|
|
version_minor = self.CPLD_VERSION_MINOR_DEFAULT
|
|
|
|
part_number = part_number.rstrip('\n').zfill(self.CPLD_PART_NUMBER_MAX_LENGTH)
|
|
version = version.rstrip('\n').zfill(self.CPLD_VERSION_MAX_LENGTH)
|
|
version_minor = version_minor.rstrip('\n').zfill(self.CPLD_VERSION_MINOR_MAX_LENGTH)
|
|
|
|
return "CPLD{}_REV{}{}".format(part_number, version, version_minor)
|
|
|
|
def get_available_firmware_version(self, image_path):
|
|
with MPFAManager(image_path) as mpfa:
|
|
if not mpfa.get_metadata().has_option('version', self.name):
|
|
raise RuntimeError("Failed to get {} available firmware version".format(self.name))
|
|
|
|
return mpfa.get_metadata().get('version', self.name)
|
|
|
|
def get_firmware_update_notification(self, image_path):
|
|
name, ext = os.path.splitext(os.path.basename(image_path))
|
|
if ext == self.COMPONENT_FIRMWARE_EXTENSION:
|
|
return "Power cycle (with 30 sec delay) or refresh image is required to complete {} firmware update".format(self.name)
|
|
|
|
return "Immediate power cycle is required to complete {} firmware update".format(self.name)
|
|
|
|
def install_firmware(self, image_path):
|
|
return self.__install_firmware(image_path)
|
|
|
|
def update_firmware(self, image_path):
|
|
with MPFAManager(image_path) as mpfa:
|
|
if not mpfa.get_metadata().has_option('firmware', 'burn'):
|
|
raise RuntimeError("Failed to get {} burn firmware".format(self.name))
|
|
if not mpfa.get_metadata().has_option('firmware', 'refresh'):
|
|
raise RuntimeError("Failed to get {} refresh firmware".format(self.name))
|
|
|
|
burn_firmware = mpfa.get_metadata().get('firmware', 'burn')
|
|
refresh_firmware = mpfa.get_metadata().get('firmware', 'refresh')
|
|
|
|
print("INFO: Processing {} burn file: firmware install".format(self.name))
|
|
if not self.__install_firmware(os.path.join(mpfa.get_path(), burn_firmware)):
|
|
return
|
|
|
|
print("INFO: Processing {} refresh file: firmware update".format(self.name))
|
|
self.__install_firmware(os.path.join(mpfa.get_path(), refresh_firmware))
|
|
|
|
@classmethod
|
|
def get_component_list(cls):
|
|
component_list = [ ]
|
|
|
|
cpld_number = cls._read_generic_file(cls.CPLD_NUMBER_FILE, cls.CPLD_NUMBER_MAX_LENGTH)
|
|
cpld_number = cpld_number.rstrip('\n')
|
|
|
|
for cpld_idx in range(1, int(cpld_number) + 1):
|
|
component_list.append(cls(cpld_idx))
|
|
|
|
return component_list
|