#!/usr/bin/env python import argparse import glob import json import os import re import subprocess import sys import time import unicodedata from sonic_py_common import device_info bmc_cache = {} cache = {} SONIC_CFGGEN_PATH = '/usr/local/bin/sonic-cfggen' HWSKU_KEY = 'DEVICE_METADATA.localhost.hwsku' PLATFORM_KEY = 'DEVICE_METADATA.localhost.platform' dirname = os.path.dirname(os.path.realpath(__file__)) color_map = { "STATUS_LED_COLOR_GREEN" : "green", "STATUS_LED_COLOR_RED" : "red", "STATUS_LED_COLOR_AMBER" : "amber", "STATUS_LED_COLOR_BLUE" : "blue", "STATUS_LED_COLOR_GREEN_BLINK" : "blinking green", "STATUS_LED_COLOR_RED_BLINK" : "blinking red", "STATUS_LED_COLOR_AMBER_BLINK" : "blinking amber", "STATUS_LED_COLOR_BLUE_BLINK" : "blinking blue", "STATUS_LED_COLOR_OFF" : "off" } class PddfParse(): def __init__(self): if not os.path.exists("/usr/share/sonic/platform"): platform, hwsku = device_info.get_platform_and_hwsku() os.symlink("/usr/share/sonic/device/"+platform, "/usr/share/sonic/platform") try: with open('/usr/share/sonic/platform/pddf/pddf-device.json') as f: self.data = json.load(f) except IOError: if os.path.exists('/usr/share/sonic/platform'): os.unlink("/usr/share/sonic/platform") raise Exception('PDDF JSON file not found. PDDF is not supported on this platform') self.data_sysfs_obj = {} self.sysfs_obj = {} ################################################################################################################### # GENERIC DEFS ################################################################################################################### def runcmd(self, cmd): rc = os.system(cmd) if rc != 0: print("%s -- command failed" % cmd) return rc def get_dev_idx(self, dev, ops): parent = dev['dev_info']['virt_parent'] pdev = self.data[parent] return pdev['dev_attr']['dev_idx'] def get_paths(self, target, attr): aa = target + attr if aa in cache: return cache[aa] strings = [] p = re.search(r'\d+$', target) if p is None: for bb in filter(re.compile(target).search, self.data.keys()): paths = self.dev_parse(self.data[bb], {"cmd": "show_attr", "target": bb, "attr": attr}) if paths: strings.extend(paths) else: if target in self.data.keys(): paths = self.dev_parse(self.data[target], {"cmd": "show_attr", "target": target, "attr": attr}) if paths: strings.extend(paths) cache[aa] = strings return strings def get_path(self, target, attr): nodes = self.get_paths(target, attr) if nodes: if len(nodes)==1: return nodes[0] # CAREFULL!!! If more than one paths are expected, use get_paths else: return nodes[0] else: return None def get_device_type(self, key): if key not in self.data.keys(): return None return self.data[key]['dev_info']['device_type'] def get_platform(self): return self.data['PLATFORM'] def get_num_psu_fans(self, dev): if dev not in self.data.keys(): return 0 if 'num_psu_fans' not in self.data[dev]['dev_attr']: return 0 return self.data[dev]['dev_attr']['num_psu_fans'] def get_led_path(self): return ("pddf/devices/led") def get_led_cur_state_path(self): return ("pddf/devices/led/cur_state") def get_led_color(self): color_f = "/sys/kernel/pddf/devices/led/cur_state/color" try: with open(color_f, 'r') as f: color = f.read().strip("\r\n") except IOError: return ("Error") return (color_map[color]) ################################################################################################################### # CREATE DEFS ################################################################################################################### def create_device(self, attr, path, ops): ret = 0 for key in attr.keys(): if type(attr[key]) is list: val = " ".join(attr[key]) else: val = attr[key] cmd = "echo '%s' > /sys/kernel/%s/%s" % (val, path, key) ret = self.runcmd(cmd) if ret != 0: return ret return ret def create_psu_i2c_device(self, dev, ops): create_ret = [] ret = 0 if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PSU']: ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/psu/i2c", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/psu/i2c/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/psu/i2c/psu_idx" % (self.get_dev_idx(dev, ops)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) for attr in dev['i2c']['attr_list']: ret = self.create_device(attr, "pddf/devices/psu/i2c", ops) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/psu/i2c/attr_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/psu/i2c/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) else: cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) def create_psu_bmc_device(self, dev, ops): return [0] def create_psu_device(self, dev, ops): return self.create_psu_i2c_device(dev, ops) def create_fan_device(self, dev, ops): create_ret = [] ret = 0 if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['FAN']: ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/fan/i2c", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/fan/i2c/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) ret = self.create_device(dev['i2c']['dev_attr'], "pddf/devices/fan/i2c", ops) if ret != 0: return create_ret.append(ret) for attr in dev['i2c']['attr_list']: ret = self.create_device(attr, "pddf/devices/fan/i2c", ops) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/fan/i2c/attr_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/fan/i2c/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) else: cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) def create_temp_sensor_device(self, dev, ops): create_ret = [] ret = 0 # NO PDDF driver for temp_sensors device cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) return create_ret.append(ret) def create_cpld_device(self, dev, ops): create_ret = [] ret = 0 if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['CPLD']: ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/cpld", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/cpld/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) # TODO: If attributes are provided then, use 'self.create_device' for them too cmd = "echo 'add' > /sys/kernel/pddf/devices/cpld/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) else: cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) def create_cpldmux_device(self, dev, ops): create_ret = [] ret = 0 ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/cpldmux", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/mux/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) self.create_device(dev['i2c']['dev_attr'], "pddf/devices/cpldmux", ops) # Parse channel info for chan in dev['i2c']['channel']: self.create_device(chan, "pddf/devices/cpldmux", ops) cmd = "echo 'add' > /sys/kernel/pddf/devices/cpldmux/chan_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/cpldmux/dev_ops" ret = self.runcmd(cmd) return create_ret.append(ret) def create_gpio_device(self, dev, ops): create_ret = [] ret = 0 ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/gpio", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/gpio/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) ret = self.create_device(dev['i2c']['dev_attr'], "pddf/devices/gpio", ops) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/gpio/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) time.sleep(2) base = dev['i2c']['dev_attr']['gpio_base'] for inst in dev['i2c']['ports']: if inst['port_num'] != "": port_no = int(base, 16) + int(inst['port_num']) cmd = "echo %d > /sys/class/gpio/export" % port_no ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) if inst['direction'] != "": cmd = "echo %s >/sys/class/gpio/gpio%d/direction" % (inst['direction'], port_no) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) if inst['value'] != "": for i in inst['value'].split(','): cmd = "echo %s >/sys/class/gpio/gpio%d/value" % (i.rstrip(), port_no) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) def create_mux_device(self, dev, ops): create_ret = [] ret = 0 ret = self.create_device(dev['i2c']['topo_info'], "pddf/devices/mux", ops) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/mux/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo %s > /sys/kernel/pddf/devices/mux/virt_bus" % (dev['i2c']['dev_attr']['virt_bus']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/mux/dev_ops" ret = self.runcmd(cmd) # Check if the dev_attr array contain idle_state if 'idle_state' in dev['i2c']['dev_attr']: cmd = "echo {} > /sys/bus/i2c/devices/{}-00{:02x}/idle_state".format(dev['i2c']['dev_attr']['idle_state'], int(dev['i2c']['topo_info']['parent_bus'],0), int(dev['i2c']['topo_info']['dev_addr'],0)) ret = self.runcmd(cmd) return create_ret.append(ret) def create_xcvr_i2c_device(self, dev, ops): create_ret = [] ret = 0 if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PORT_MODULE']: self.create_device(dev['i2c']['topo_info'], "pddf/devices/xcvr/i2c", ops) cmd = "echo '%s' > /sys/kernel/pddf/devices/xcvr/i2c/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo '%s' > /sys/kernel/pddf/devices/xcvr/i2c/dev_idx" % (self.get_dev_idx(dev, ops)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) for attr in dev['i2c']['attr_list']: self.create_device(attr, "pddf/devices/xcvr/i2c", ops) cmd = "echo 'add' > /sys/kernel/pddf/devices/xcvr/i2c/attr_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) cmd = "echo 'add' > /sys/kernel/pddf/devices/xcvr/i2c/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) else: cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) # print("\n") if ret != 0: return create_ret.append(ret) # Add port name port_name_sysfs = '/sys/bus/i2c/devices/{}-00{:02x}/port_name'.format( int(dev['i2c']['topo_info']['parent_bus'], 0), int(dev['i2c']['topo_info']['dev_addr'], 0)) if os.path.exists(port_name_sysfs): cmd = "echo {} > /sys/bus/i2c/devices/{}-00{:02x}/port_name".format( dev['dev_info']['virt_parent'].lower(), int(dev['i2c']['topo_info']['parent_bus'], 0), int(dev['i2c']['topo_info']['dev_addr'], 0)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) def create_xcvr_bmc_device(self, dev, ops): return [0] def create_xcvr_device(self, dev, ops): return self.create_xcvr_i2c_device(dev, ops) def create_sysstatus_device(self, dev, ops): create_ret = [] ret = 0 for attr in dev['attr_list']: self.create_device(attr, "pddf/devices/sysstatus", ops) cmd = "echo 'add' > /sys/kernel/pddf/devices/sysstatus/attr_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) def create_eeprom_device(self, dev, ops): create_ret = [] ret = 0 if "EEPROM" in self.data['PLATFORM']['pddf_dev_types'] and \ dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['EEPROM']: self.create_device(dev['i2c']['topo_info'], "pddf/devices/eeprom/i2c", ops) cmd = "echo '%s' > /sys/kernel/pddf/devices/eeprom/i2c/i2c_name" % (dev['dev_info']['device_name']) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) self.create_device(dev['i2c']['dev_attr'], "pddf/devices/eeprom/i2c", ops) cmd = "echo 'add' > /sys/kernel/pddf/devices/eeprom/i2c/dev_ops" ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) else: cmd = "echo %s 0x%x > /sys/bus/i2c/devices/i2c-%d/new_device" % (dev['i2c']['topo_info']['dev_type'], int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) ret = self.runcmd(cmd) if ret != 0: return create_ret.append(ret) return create_ret.append(ret) ################################################################################################################### # DELETE DEFS ################################################################################################################### def delete_eeprom_device(self, dev, ops): if "EEPROM" in self.data['PLATFORM']['pddf_dev_types'] and \ dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['EEPROM']: cmd = "echo '%s' > /sys/kernel/pddf/devices/eeprom/i2c/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/eeprom/i2c/dev_ops" self.runcmd(cmd) else: cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_sysstatus_device(self, dev, ops): # NOT A PHYSICAL DEVICE.... rmmod on module would remove all the artifacts pass def delete_xcvr_i2c_device(self, dev, ops): if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PORT_MODULE']: cmd = "echo '%s' > /sys/kernel/pddf/devices/xcvr/i2c/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/xcvr/i2c/dev_ops" self.runcmd(cmd) else: cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_xcvr_device(self, dev, ops): self.delete_xcvr_i2c_device(dev, ops) return def delete_gpio_device(self, dev, ops): cmd = "echo '%s' > /sys/kernel/pddf/devices/gpio/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/gpio/dev_ops" self.runcmd(cmd) def delete_mux_device(self, dev, ops): cmd = "echo '%s' > /sys/kernel/pddf/devices/mux/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/mux/dev_ops" self.runcmd(cmd) def delete_cpld_device(self, dev, ops): if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['CPLD']: cmd = "echo '%s' > /sys/kernel/pddf/devices/cpld/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/cpld/dev_ops" self.runcmd(cmd) else: cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_cpldmux_device(self, dev, ops): if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['CPLDMUX']: cmd = "echo '%s' > /sys/kernel/pddf/devices/cpldmux/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/cpldmux/dev_ops" self.runcmd(cmd) def delete_temp_sensor_device(self, dev, ops): # NO PDDF driver for temp_sensors device cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_fan_device(self, dev, ops): if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['FAN']: cmd = "echo '%s' > /sys/kernel/pddf/devices/fan/i2c/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/fan/i2c/dev_ops" self.runcmd(cmd) else: cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_psu_i2c_device(self, dev, ops): if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PSU']: cmd = "echo '%s' > /sys/kernel/pddf/devices/psu/i2c/i2c_name" % (dev['dev_info']['device_name']) self.runcmd(cmd) cmd = "echo 'delete' > /sys/kernel/pddf/devices/psu/i2c/dev_ops" self.runcmd(cmd) else: cmd = "echo 0x%x > /sys/bus/i2c/devices/i2c-%d/delete_device" % ( int(dev['i2c']['topo_info']['dev_addr'], 0), int(dev['i2c']['topo_info']['parent_bus'], 0)) self.runcmd(cmd) def delete_psu_device(self, dev, ops): self.delete_psu_i2c_device(dev, ops) return ################################################################################################################### # SHOW ATTRIBIUTES DEFS ################################################################################################################### def is_led_device_configured(self, device_name, attr_name): if device_name in self.data.keys(): attr_list = self.data[device_name]['i2c']['attr_list'] for attr in attr_list: if attr['attr_name'].strip() == attr_name.strip(): return (True) return (False) def show_device_sysfs(self, dev, ops): parent = dev['dev_info']['device_parent'] pdev = self.data[parent] if pdev['dev_info']['device_parent'] == 'SYSTEM': if 'topo_info' in pdev['i2c']: return "/sys/bus/i2c/devices/"+"i2c-%d"%int(pdev['i2c']['topo_info']['dev_addr'], 0) else: return "/sys/bus/i2c/devices" return self.show_device_sysfs(pdev, ops) + "/" + "i2c-%d" % int(dev['i2c']['topo_info']['parent_bus'], 0) def get_gpio_attr_path(self, dev, offset): base = int(dev['i2c']['dev_attr']['gpio_base'], 16) port_num = base + int(offset, 16) gpio_name = 'gpio'+str(port_num) path = '/sys/class/gpio/'+gpio_name+'/value' return path # 'at24' type of EEPROM devices have only one attribtue 'eeprom' def show_attr_eeprom_device(self, dev, ops): ret = [] attr_name = ops['attr'] attr_list = dev['i2c']['attr_list'] KEY = "eeprom" dsysfs_path = "" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': if 'drv_attr_name' in attr.keys(): real_name = attr['drv_attr_name'] else: real_name = attr['attr_name'] dsysfs_path = self.show_device_sysfs(dev, ops) + \ "/%d-00%x" % (int(dev['i2c']['topo_info']['parent_bus'], 0), int(dev['i2c']['topo_info']['dev_addr'], 0)) + \ "/%s" % real_name if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(dsysfs_path) return ret def show_attr_gpio_device(self, dev, ops): ret = [] KEY = "gpio" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] return ret def show_attr_mux_device(self, dev, ops): ret = [] KEY = "mux" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] return ret def show_attr_psu_i2c_device(self, dev, ops): target = ops['target'] attr_name = ops['attr'] ret = [] KEY = "psu" dsysfs_path = "" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] if target == 'all' or target == dev['dev_info']['virt_parent']: attr_list = dev['i2c']['attr_list'] if 'i2c' in dev else [] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': if 'attr_devtype' in attr.keys() and attr['attr_devtype'] == "gpio": # Check and enable the gpio from class attr_path = self.get_gpio_attr_path(self.data[attr['attr_devname']], attr['attr_offset']) if (os.path.exists(attr_path)): if attr_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(attr_path) ret.append(attr_path) else: if 'drv_attr_name' in attr.keys(): real_name = attr['drv_attr_name'] real_dev = dev elif 'attr_devattr' in attr.keys(): real_name = attr['attr_devattr'] real_devname = attr['attr_devname'] if 'attr_devname' in attr.keys() else '' real_dev = self.data[real_devname] else: real_name = attr['attr_name'] real_dev = dev dsysfs_path = self.show_device_sysfs(real_dev, ops) + \ "/%d-00%x" % (int(real_dev['i2c']['topo_info']['parent_bus'], 0), int(real_dev['i2c']['topo_info']['dev_addr'], 0)) + \ "/%s" % real_name if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(dsysfs_path) return ret def show_attr_psu_device(self, dev, ops): return self.show_attr_psu_i2c_device(dev, ops) def show_attr_fan_device(self, dev, ops): ret = [] attr_name = ops['attr'] attr_list = dev['i2c']['attr_list'] if 'i2c' in dev else [] KEY = "fan" dsysfs_path = "" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': if 'attr_devtype' in attr.keys() and attr['attr_devtype'] == "gpio": # Check and enable the gpio from class attr_path = self.get_gpio_attr_path(self.data[attr['attr_devname']], attr['attr_offset']) if (os.path.exists(attr_path)): if attr_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(attr_path) ret.append(attr_path) else: if 'drv_attr_name' in attr.keys(): real_name = attr['drv_attr_name'] real_dev = dev elif 'attr_devattr' in attr.keys(): real_name = attr['attr_devattr'] real_devname = attr['attr_devname'] if 'attr_devname' in attr.keys() else '' real_dev = self.data[real_devname] else: real_name = attr['attr_name'] real_dev = dev dsysfs_path = self.show_device_sysfs(real_dev, ops) + \ "/%d-00%x" % (int(real_dev['i2c']['topo_info']['parent_bus'], 0), int(real_dev['i2c']['topo_info']['dev_addr'], 0)) + \ "/%s" % real_name if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(dsysfs_path) return ret # This is only valid for LM75 def show_attr_temp_sensor_device(self, dev, ops): ret = [] attr_name = ops['attr'] attr_list = dev['i2c']['attr_list'] if 'i2c' in dev else [] KEY = "temp-sensors" dsysfs_path = "" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': path = self.show_device_sysfs(dev, ops) + \ "/%d-00%x/" % (int(dev['i2c']['topo_info']['parent_bus'], 0), int(dev['i2c']['topo_info']['dev_addr'], 0)) if 'drv_attr_name' in attr.keys(): real_name = attr['drv_attr_name'] else: real_name = attr['attr_name'] if (os.path.exists(path)): full_path = glob.glob(path + 'hwmon/hwmon*/' + real_name)[0] dsysfs_path = full_path if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(full_path) return ret def show_attr_sysstatus_device(self, dev, ops): ret = [] attr_name = ops['attr'] attr_list = dev['attr_list'] KEY = "sys-status" dsysfs_path = "" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': dsysfs_path = "/sys/kernel/pddf/devices/sysstatus/sysstatus_data/" + attr['attr_name'] if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(dsysfs_path) return ret def show_attr_xcvr_i2c_device(self, dev, ops): target = ops['target'] attr_name = ops['attr'] ret = [] dsysfs_path = "" KEY = "xcvr" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] if target == 'all' or target == dev['dev_info']['virt_parent']: attr_list = dev['i2c']['attr_list'] for attr in attr_list: if attr_name == attr['attr_name'] or attr_name == 'all': if 'attr_devtype' in attr.keys() and attr['attr_devtype'] == "gpio": # Check and enable the gpio from class attr_path = self.get_gpio_attr_path(self.data[attr['attr_devname']], attr['attr_offset']) if (os.path.exists(attr_path)): if attr_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(attr_path) ret.append(attr_path) else: if 'drv_attr_name' in attr.keys(): real_name = attr['drv_attr_name'] real_dev = dev elif 'attr_devattr' in attr.keys(): real_name = attr['attr_devattr'] real_devname = attr['attr_devname'] if 'attr_devname' in attr.keys() else '' real_dev = self.data[real_devname] else: real_name = attr['attr_name'] real_dev = dev dsysfs_path = self.show_device_sysfs(real_dev, ops) + \ "/%d-00%x" % (int(real_dev['i2c']['topo_info']['parent_bus'], 0), int(real_dev['i2c']['topo_info']['dev_addr'], 0)) + \ "/%s" % real_name if dsysfs_path not in self.data_sysfs_obj[KEY]: self.data_sysfs_obj[KEY].append(dsysfs_path) ret.append(dsysfs_path) return ret def show_attr_xcvr_device(self, dev, ops): return self.show_attr_xcvr_i2c_device(dev, ops) def show_attr_cpld_device(self, dev, ops): ret = [] KEY = "cpld" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] return ret def show_attr_fpgai2c_device(self, dev, ops): ret = [] KEY="fpgai2c" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY]=[] return ret def show_attr_cpldmux_device(self, dev, ops): ret = [] KEY = "cpldmux" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY] = [] return ret def show_attr_fpgapci_device(self, dev, ops): ret = [] KEY="fpgapci" if KEY not in self.data_sysfs_obj: self.data_sysfs_obj[KEY]=[] return ret ################################################################################################################### # SHOW DEFS ################################################################################################################### def check_led_cmds(self, key, ops): name = ops['target']+'_LED' if (ops['target'] == 'config' or ops['attr'] == 'all') or \ (name == self.data[key]['dev_info']['device_name'] and ops['attr'] == self.data[key]['dev_attr']['index']): return (True) else: return (False) def dump_sysfs_obj(self, obj, key_type): if (key_type == 'keys'): for key in obj.keys(): print(key) return for key in obj: if (key == key_type or key_type == 'all'): print(key+":") for entry in obj[key]: print("\t"+entry) def add_list_sysfs_obj(self, obj, KEY, list): for sysfs in list: if not sysfs in obj[KEY]: obj[KEY].append(sysfs) def sysfs_attr(self, key, value, path, obj, obj_key): sysfs_path = "/sys/kernel/%s/%s" % (path, key) if not sysfs_path in obj[obj_key]: obj[obj_key].append(sysfs_path) def sysfs_device(self, attr, path, obj, obj_key): for key in attr.keys(): sysfs_path = "/sys/kernel/%s/%s" % (path, key) if not sysfs_path in obj[obj_key]: obj[obj_key].append(sysfs_path) def show_eeprom_device(self, dev, ops): return def show_mux_device(self, dev, ops): KEY = 'mux' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], "pddf/devices/mux", self.sysfs_obj, KEY) self.sysfs_device(dev['i2c']['dev_attr'], "pddf/devices/mux", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/mux/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list = ['/sys/kernel/pddf/devices/mux/i2c_type', '/sys/kernel/pddf/devices/mux/i2c_name', '/sys/kernel/pddf/devices/mux/error'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_gpio_device(self, dev, ops): KEY = 'gpio' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], "pddf/devices/gpio", self.sysfs_obj, KEY) self.sysfs_device(dev['i2c']['dev_attr'], "pddf/devices/gpio", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/gpio/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list = ['/sys/kernel/pddf/devices/gpio/i2c_type', '/sys/kernel/pddf/devices/gpio/i2c_name', '/sys/kernel/pddf/devices/gpio/error'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_psu_i2c_device(self, dev, ops): KEY = 'psu' path = 'pddf/devices/psu/i2c' if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PSU']: if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], path, self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/psu/i2c/psu_idx" self.sysfs_obj[KEY].append(sysfs_path) for attr in dev['i2c']['attr_list']: self.sysfs_device(attr, "pddf/devices/psu/i2c", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/psu/i2c/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list = ['/sys/kernel/pddf/devices/psu/i2c/i2c_type', '/sys/kernel/pddf/devices/fan/i2c/i2c_name', '/sys/kernel/pddf/devices/psu/i2c/error', '/sys/kernel/pddf/devices/psu/i2c/attr_ops'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_psu_device(self, dev, ops): self.show_psu_i2c_device(dev, ops) return def show_client_device(self): KEY = 'client' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] list = ['/sys/kernel/pddf/devices/showall'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_fan_device(self, dev, ops): KEY = 'fan' path = 'pddf/devices/fan/i2c' if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['FAN']: if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], path, self.sysfs_obj, KEY) self.sysfs_device(dev['i2c']['dev_attr'], path, self.sysfs_obj, KEY) for attr in dev['i2c']['attr_list']: self.sysfs_device(attr, path, self.sysfs_obj, KEY) list = ['/sys/kernel/pddf/devices/fan/i2c/i2c_type', '/sys/kernel/pddf/devices/fan/i2c/i2c_name', '/sys/kernel/pddf/devices/fan/i2c/error', '/sys/kernel/pddf/devices/fan/i2c/attr_ops', '/sys/kernel/pddf/devices/fan/i2c/dev_ops'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_temp_sensor_device(self, dev, ops): return def show_sysstatus_device(self, dev, ops): KEY = 'sysstatus' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] for attr in dev['attr_list']: self.sysfs_device(attr, "pddf/devices/sysstatus", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/sysstatus/attr_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) def show_xcvr_i2c_device(self, dev, ops): KEY = 'xcvr' if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PORT_MODULE']: if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], "pddf/devices/xcvr/i2c", self.sysfs_obj, KEY) for attr in dev['i2c']['attr_list']: self.sysfs_device(attr, "pddf/devices/xcvr/i2c", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/xcvr/i2c/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list = ['/sys/kernel/pddf/devices/xcvr/i2c/i2c_type', '/sys/kernel/pddf/devices/xcvr/i2c/i2c_name', '/sys/kernel/pddf/devices/xcvr/i2c/error', '/sys/kernel/pddf/devices/xcvr/i2c/attr_ops'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_xcvr_device(self, dev, ops): self.show_xcvr_i2c_device(dev, ops) return def show_cpld_device(self, dev, ops): KEY = 'cpld' if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['CPLD']: if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] self.sysfs_device(dev['i2c']['topo_info'], "pddf/devices/cpld", self.sysfs_obj, KEY) sysfs_path = "/sys/kernel/pddf/devices/cpld/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list = ['/sys/kernel/pddf/devices/cpld/i2c_type', '/sys/kernel/pddf/devices/cpld/i2c_name', '/sys/kernel/pddf/devices/cpld/error'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def show_led_platform_device(self, key, ops): if ops['attr'] == 'all' or ops['attr'] == 'PLATFORM': KEY = 'platform' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] path = 'pddf/devices/platform' self.sysfs_attr('num_psus', self.data['PLATFORM']['num_psus'], path, self.sysfs_obj, KEY) self.sysfs_attr('num_fantrays', self.data['PLATFORM']['num_fantrays'], path, self.sysfs_obj, KEY) def show_led_device(self, key, ops): if self.check_led_cmds(key, ops): KEY='led' if not KEY in self.sysfs_obj: self.sysfs_obj[KEY] = [] path="pddf/devices/led" for attr in self.data[key]['i2c']['attr_list']: self.sysfs_attr('device_name', self.data[key]['dev_info']['device_name'],path,self.sysfs_obj,KEY) self.sysfs_attr('swpld_addr', self.data[key]['dev_info']['device_name'],path,self.sysfs_obj, KEY) self.sysfs_attr('swpld_addr_offset',self.data[key]['dev_info']['device_name'], path,self.sysfs_obj, KEY) self.sysfs_device(self.data[key]['dev_attr'], path, self.sysfs_obj, KEY) for attr_key in attr.keys(): attr_path = "pddf/devices/led/" + attr['attr_name'] if (attr_key != 'attr_name' and attr_key != 'swpld_addr' and attr_key != 'swpld_addr_offset'): self.sysfs_attr(attr_key, attr[attr_key], attr_path, self.sysfs_obj, KEY) sysfs_path="/sys/kernel/pddf/devices/led/dev_ops" if not sysfs_path in self.sysfs_obj[KEY]: self.sysfs_obj[KEY].append(sysfs_path) list=['/sys/kernel/pddf/devices/led/cur_state/color'] self.add_list_sysfs_obj(self.sysfs_obj, KEY, list) def validate_xcvr_device(self, dev, ops): devtype_list = ['optoe1', 'optoe2'] dev_attribs = ['xcvr_present', 'xcvr_reset', 'xcvr_intr_status', 'xcvr_lpmode'] ret_val = "xcvr validation failed" if dev['i2c']['topo_info']['dev_type'] in devtype_list: for attr in dev['i2c']['attr_list']: if 'attr_name' in attr.keys() and 'eeprom' in attr.values(): ret_val = "xcvr validation success" else: print("xcvr validation Failed") return elif dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PORT_MODULE']: for attr in dev['i2c']['attr_list']: if attr.get("attr_name") in dev_attribs: ret_val = "Success" else: print("xcvr validation Failed") return print(ret_val) def validate_eeprom_device(self, dev, ops): devtype_list = ['24c02'] dev_access_mode = ['BLOCK', 'BYTE'] dev_attribs = ['eeprom'] ret_val = "eeprom failed" if dev['i2c']['topo_info']['dev_type'] in devtype_list: if dev['i2c']['dev_attr']['access_mode'] in dev_access_mode: for attr in dev['i2c']['attr_list']: if attr.get("attr_name") in dev_attribs: ret_val = "eeprom success" print(ret_val) def validate_mux_device(self, dev, ops): devtype_list = ['pca9548', 'pca954x'] dev_channels = ["0", "1", "2", "3", "4", "5", "6", "7"] ret_val = "mux failed" if dev['i2c']['topo_info']['dev_type'] in devtype_list: for attr in dev['i2c']['channel']: if attr.get("chn") in dev_channels: ret_val = "Mux success" print(ret_val) def validate_cpld_device(self, dev, ops): devtype_list = ['i2c_cpld'] ret_val = "cpld failed" if dev['i2c']['topo_info']['dev_type'] in devtype_list: ret_val = "cpld success" print(ret_val) def validate_sysstatus_device(self, dev, ops): dev_attribs = ['board_info', 'cpld1_version', 'power_module_status', 'system_reset5', 'system_reset6', 'system_reset7', 'misc1', 'cpld2_version', 'cpld3_version' ] ret_val = "sysstatus failed" if dev['dev_info']['device_type'] == "SYSSTAT": for attr in dev['attr_list']: if attr.get("attr_name") in dev_attribs: ret_val = "sysstatus success" print(ret_val) def validate_temp_sensor_device(self, dev, ops): devtype_list = ['lm75'] dev_attribs = ['temp1_max', 'temp1_max_hyst', 'temp1_input'] ret_val = "temp sensor failed" if dev['dev_info']['device_type'] == "TEMP_SENSOR": if dev['i2c']['topo_info']['dev_type'] in devtype_list: for attr in dev['i2c']['attr_list']: if attr.get("attr_name") in dev_attribs: ret_val = "tempsensor success" print(ret_val) def validate_fan_device(self, dev, ops): ret_val = "fan failed" if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['FAN']: if dev['i2c']['dev_attr']['num_fantrays'] is not None: ret_val = "fan success" print(ret_val) def validate_psu_device(self, dev, ops): dev_attribs = ['psu_present', 'psu_model_name', 'psu_power_good', 'psu_mfr_id', 'psu_serial_num', 'psu_fan_dir', 'psu_v_out', 'psu_i_out', 'psu_p_out', 'psu_fan1_speed_rpm' ] ret_val = "psu failed" if dev['i2c']['topo_info']['dev_type'] in self.data['PLATFORM']['pddf_dev_types']['PSU']: for attr in dev['i2c']['attr_list']: if attr.get("attr_name") in dev_attribs: if attr.get("attr_devaddr") is not None: if attr.get("attr_offset") is not None: if attr.get("attr_mask") is not None: if attr.get("attr_len") is not None: ret_val = "psu success" else: ret_val = "psu failed" print(ret_val) ################################################################################################################### # SPYTEST ################################################################################################################### def verify_attr(self, key, attr, path): node = "/sys/kernel/%s/%s" % (path, key) try: with open(node, 'r') as f: status = f.read() except IOError: print("PDDF_VERIFY_ERR: IOError: node:%s key:%s" % (node, key)) return status = status.rstrip("\n\r") if attr[key] != status: print("PDDF_VERIFY_ERR: node: %s switch:%s" % (node, status)) def verify_device(self, attr, path, ops): for key in attr.keys(): self.verify_attr(key, attr, path) def get_led_device(self, device_name): self.create_attr('device_name', self.data[device_name]['dev_info']['device_name'], "pddf/devices/led") self.create_attr('index', self.data[device_name]['dev_attr']['index'], "pddf/devices/led") cmd = "echo 'verify' > /sys/kernel/pddf/devices/led/dev_ops" self.runcmd(cmd) def validate_sysfs_creation(self, obj, validate_type): dir = '/sys/kernel/pddf/devices/'+validate_type if (os.path.exists(dir) or validate_type == 'client'): for sysfs in obj[validate_type]: if(not os.path.exists(sysfs)): print("[SYSFS FILE] " + sysfs + ": does not exist") else: print("[SYSFS DIR] " + dir + ": does not exist") def validate_dsysfs_creation(self, obj, validate_type): if validate_type in obj.keys(): # There is a possibility that some components dont have any device-self.data attr if not obj[validate_type]: print("[SYSFS ATTR] for " + validate_type + ": empty ") else: for sysfs in obj[validate_type]: if(not os.path.exists(sysfs)): print("[SYSFS FILE] " + sysfs + ": does not exist") else: print("[SYSFS KEY] for " + validate_type + ": not configured") def verify_sysfs_data(self, verify_type): if (verify_type == 'LED'): for key in self.data.keys(): if key != 'PLATFORM': attr = self.data[key]['dev_info'] if attr['device_type'] == 'LED': self.get_led_device(key) self.verify_attr('device_name', self.data[key]['dev_info'], "pddf/devices/led") self.verify_attr('index', self.data[key]['dev_attr'], "pddf/devices/led") for attr in self.data[key]['i2c']['attr_list']: path = "pddf/devices/led/" + attr['attr_name'] for entry in attr.keys(): if (entry != 'attr_name' and entry != 'swpld_addr' and entry != 'swpld_addr_offset'): self.verify_attr(entry, attr, path) if (entry == 'swpld_addr' or entry == 'swpld_addr_offset'): self.verify_attr(entry, attr, 'pddf/devices/led') def schema_validation(self, validate_type): process_validate_type = 0 for key in self.data.keys(): if (key != 'PLATFORM'): temp_obj = {} schema_list = [] try: device_type = self.data[key]["dev_info"]["device_type"] except Exception as e: print("dev_info or device_type ERROR: " + key) print(e) if validate_type == 'mismatch': process_validate_type = 1 device_type = "PSU" schema_file = "/usr/local/bin/schema/FAN.schema" schema_list.append(schema_file) elif validate_type == 'missing': process_validate_type = 1 schema_file = "/usr/local/bin/schema/PLATFORM.schema" schema_list.append(schema_file) elif validate_type == 'empty': process_validate_type = 1 if not device_type: print("Empty device_type for " + key) continue elif (validate_type == 'all' or validate_type == device_type): process_validate_type = 1 if "bmc" in self.data[key].keys(): schema_file = "/usr/local/bin/schema/"+device_type + "_BMC.schema" schema_list.append(schema_file) if "i2c" in self.data[key].keys(): schema_file = "/usr/local/bin/schema/"+device_type + ".schema" schema_list.append(schema_file) if device_type: temp_obj[device_type] = self.data[key] for schema_file in schema_list: if (os.path.exists(schema_file)): print("Validate " + schema_file + ";" + key) json_data = json.dumps(temp_obj) with open(schema_file, 'r') as f: schema = json.load(f) try: from jsonschema import validate validate(temp_obj, schema) except Exception as e: print("Validation ERROR: " + schema_file + ";" + key) if validate_type == 'mismatch': return else: print(e) else: print("ERROR Missing File: " + schema_file) if not process_validate_type: print("device_type: " + validate_type + " not configured") def modules_validation(self, validate_type): kos = [] supported_type = False module_validation_status = [] if validate_type == "bmc": kos = ['ipmi_devintf', 'ipmi_si', 'ipmi_msghandler'] validate_type = 'ipmi' else: # generate the KOS list from pddf device JSON file kos.extend(self.data['PLATFORM']['pddf_kos']) if 'custom_kos' in self.data['PLATFORM']: kos.extend(self.data['PLATFORM']['custom_kos']) for mod in kos: if validate_type in mod or validate_type == "pddf": supported_type = True cmd = "lsmod | grep " + mod try: subprocess.check_output(cmd, shell=True) except Exception as e: module_validation_status.append(mod) if supported_type: if module_validation_status: module_validation_status.append(":ERROR not loaded") print(str(module_validation_status)[1:-1]) else: print("Loaded") else: print(validate_type + " not configured" ) ################################################################################################################### # PARSE DEFS ################################################################################################################### def psu_parse(self, dev, ops): ret = [] for ifce in (dev['i2c']['interface'] if 'i2c' in dev else []): val = getattr(self, ops['cmd']+"_psu_device")(self.data[ifce['dev']], ops) if val: if str(val[0]).isdigit(): if val[0] != 0: # in case if 'create' functions print("{}_psu_device failed for {}".format(ops['cmd'], ifce['dev'])) return val else: # in case of 'show_attr' functions ret.extend(val) return ret def fan_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_fan_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_fan_device failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def temp_sensor_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_temp_sensor_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_temp_sensor_device failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def cpld_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_cpld_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_cpld_device failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def fpgai2c_parse(self, dev, ops): val = [] ret = getattr(self, ops['cmd']+"_fpgai2c_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0]!=0: # in case if 'create' functions print("{}_fpgai2c_device failed".format(ops['cmd'])) return ret else: # in case of 'show_attr' functions val.extend(ret) return val def cpldmux_parse(self, dev, ops): val = [] ret = getattr(self, ops['cmd']+"_cpldmux_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_cpldmux_device() cmd failed".format(ops['cmd'])) return ret else: val.extend(ret) for chan in dev['i2c']['channel']: for device in chan['dev']: ret = self.dev_parse(self.data[device], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) return val def cpldmux_parse_reverse(self, dev, ops): val = [] for chan in reversed(dev['i2c']['channel']): for device in reversed(chan['dev']): ret = self.dev_parse(self.data[device], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) ret = getattr(self, ops['cmd']+"_cpldmux_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_cpldmux_device() cmd failed".format(ops['cmd'])) return ret else: val.extend(ret) return val def sysstatus_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_sysstatus_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_sysstatus_device failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def gpio_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_gpio_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_gpio_device failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def mux_parse(self, dev, ops): val = [] ret = getattr(self, ops['cmd']+"_mux_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_mux_device() cmd failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret else: val.extend(ret) for ch in dev['i2c']['channel']: ret = self.dev_parse(self.data[ch['dev']], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) return val def mux_parse_reverse(self, dev, ops): val = [] for ch in reversed(dev['i2c']['channel']): ret = self.dev_parse(self.data[ch['dev']], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) ret = getattr(self, ops['cmd']+"_mux_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_mux_device() cmd failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret else: val.extend(ret) return val def eeprom_parse(self, dev, ops): ret = [] ret = getattr(self, ops['cmd']+"_eeprom_device")(dev, ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_eeprom_device() cmd failed for {}".format(ops['cmd'], dev['dev_info']['device_name'])) return ret def optic_parse(self, dev, ops): val = [] for ifce in dev['i2c']['interface']: ret = getattr(self, ops['cmd']+"_xcvr_device")(self.data[ifce['dev']], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions print("{}_xcvr_device() cmd failed for {}".format(ops['cmd'], ifce['dev'])) return ret else: val.extend(ret) return val def cpu_parse(self, bus, ops): val = [] for dev in bus['i2c']['CONTROLLERS']: dev1 = self.data[dev['dev']] for d in dev1['i2c']['DEVICES']: ret = self.dev_parse(self.data[d['dev']], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) return val def cpu_parse_reverse(self, bus, ops): val = [] for dev in reversed(bus['i2c']['CONTROLLERS']): dev1 = self.data[dev['dev']] for d in reversed(dev1['i2c']['DEVICES']): ret = self.dev_parse(self.data[d['dev']], ops) if ret: if str(ret[0]).isdigit(): if ret[0] != 0: # in case if 'create' functions return ret else: val.extend(ret) return val # 'create' and 'show_attr' ops returns an array # 'delete', 'show' and 'validate' ops return None def dev_parse(self, dev, ops): attr=dev['dev_info'] if attr['device_type'] == 'CPU': if ops['cmd']=='delete': return self.cpu_parse_reverse(dev, ops) else: return self.cpu_parse(dev, ops) if attr['device_type'] == 'EEPROM': return self.eeprom_parse(dev, ops) if attr['device_type'] == 'MUX': if ops['cmd']=='delete': return self.mux_parse_reverse(dev, ops) else: return self.mux_parse(dev, ops) if attr['device_type'] == 'GPIO': return self.gpio_parse(dev, ops) if attr['device_type'] == 'PSU': return self.psu_parse(dev, ops) if attr['device_type'] == 'FAN': return self.fan_parse(dev, ops) if attr['device_type'] == 'TEMP_SENSOR': return self.temp_sensor_parse(dev, ops) if attr['device_type'] == 'SFP' or attr['device_type'] == 'SFP+' or attr['device_type'] == 'SFP28' or \ attr['device_type'] == 'QSFP' or attr['device_type'] == 'QSFP+' or attr['device_type'] == 'QSFP28' or \ attr['device_type'] == 'QSFP-DD': return self.optic_parse(dev, ops) if attr['device_type'] == 'CPLD': return self.cpld_parse(dev, ops) if attr['device_type'] == 'CPLDMUX': if ops['cmd'] == 'delete': return self.cpldmux_parse_reverse(dev, ops) else: return self.cpldmux_parse(dev, ops) if attr['device_type'] == 'SYSSTAT': return self.sysstatus_parse(dev, ops) def is_supported_sysled_state(self, sysled_name, sysled_state): if not sysled_name in self.data.keys(): return False, "[FAILED] " + sysled_name + " is not configured" for attr in self.data[sysled_name]['i2c']['attr_list']: if attr['attr_name'] == sysled_state: return True, "[PASS] supported" return False, "[FAILED]: Invalid color" def create_attr(self, key, value, path): cmd = "echo '%s' > /sys/kernel/%s/%s" % (value, path, key) self.runcmd(cmd) def create_led_platform_device(self, key, ops): if ops['attr'] == 'all' or ops['attr'] == 'PLATFORM': path = 'pddf/devices/platform' self.create_attr('num_psus', self.data['PLATFORM']['num_psus'], path) self.create_attr('num_fantrays', self.data['PLATFORM']['num_fantrays'], path) def create_led_device(self, key, ops): if ops['attr']=='all' or ops['attr']==self.data[key]['dev_info']['device_name']: path="pddf/devices/led" for attr in self.data[key]['i2c']['attr_list']: self.create_attr('device_name', self.data[key]['dev_info']['device_name'], path) self.create_device(self.data[key]['dev_attr'], path, ops) for attr_key in attr.keys(): if (attr_key == 'swpld_addr_offset' or attr_key == 'swpld_addr'): self.create_attr(attr_key, attr[attr_key], path) elif (attr_key != 'attr_name' and attr_key != 'descr' and attr_key != 'attr_devtype' and attr_key != 'attr_devname'): state_path=path+'/state_attr' self.create_attr(attr_key, attr[attr_key],state_path) cmd="echo '" + attr['attr_name']+"' > /sys/kernel/pddf/devices/led/dev_ops" self.runcmd(cmd) def led_parse(self, ops): getattr(self, ops['cmd']+"_led_platform_device")("PLATFORM", ops) for key in self.data.keys(): if key != 'PLATFORM' and 'dev_info' in self.data[key]: attr = self.data[key]['dev_info'] if attr['device_type'] == 'LED': getattr(self, ops['cmd']+"_led_device")(key, ops) def get_device_list(self, list, type): for key in self.data.keys(): if key != 'PLATFORM' and 'dev_info' in self.data[key]: attr = self.data[key]['dev_info'] if attr['device_type'] == type: list.append(self.data[key]) def create_pddf_devices(self): self.led_parse({"cmd": "create", "target": "all", "attr": "all"}) create_ret = 0 ret = self.dev_parse(self.data['SYSTEM'], {"cmd": "create", "target": "all", "attr": "all"}) if ret: if ret[0] != 0: return ret[0] if 'SYSSTATUS' in self.data: ret = self.dev_parse(self.data['SYSSTATUS'], {"cmd": "create", "target": "all", "attr": "all"}) if ret: if ret[0] != 0: return ret[0] return create_ret def delete_pddf_devices(self): self.dev_parse(self.data['SYSTEM'], {"cmd": "delete", "target": "all", "attr": "all"}) if 'SYSSTATUS' in self.data: self.dev_parse(self.data['SYSSTATUS'], {"cmd": "delete", "target": "all", "attr": "all"}) def populate_pddf_sysfsobj(self): self.dev_parse(self.data['SYSTEM'], {"cmd": "show", "target": "all", "attr": "all"}) if 'SYSSTATUS' in self.data: self.dev_parse(self.data['SYSSTATUS'], {"cmd": "show", "target": "all", "attr": "all"}) self.led_parse({"cmd": "show", "target": "all", "attr": "all"}) self.show_client_device() def cli_dump_dsysfs(self, component): self.dev_parse(self.data['SYSTEM'], {"cmd": "show_attr", "target": "all", "attr": "all"}) if 'SYSSTATUS' in self.data: self.dev_parse(self.data['SYSSTATUS'], {"cmd": "show_attr", "target": "all", "attr": "all"}) if component in self.data_sysfs_obj: return self.data_sysfs_obj[component] else: return None def validate_pddf_devices(self, *args): self.populate_pddf_sysfsobj() v_ops = {'cmd': 'validate', 'target': 'all', 'attr': 'all'} self.dev_parse(self.data['SYSTEM'], v_ops) ################################################################################################################## # BMC APIs ################################################################################################################## def populate_bmc_cache_db(self, bmc_attr): bmc_cmd = str(bmc_attr['bmc_cmd']).strip() if 'delimiter' in bmc_attr.keys(): delim = str(bmc_attr['delimiter']).strip() else: delim = None o_list = subprocess.check_output(bmc_cmd, shell=True).strip().split('\n') bmc_cache[bmc_cmd] = {} bmc_cache[bmc_cmd]['time'] = time.time() for entry in o_list: name = entry.split(delim)[0].strip() bmc_cache[bmc_cmd][name] = entry def non_raw_ipmi_get_request(self, bmc_attr): bmc_db_update_time = 1 value = 'N/A' bmc_cmd = str(bmc_attr['bmc_cmd']).strip() field_name = str(bmc_attr['field_name']).strip() field_pos = int(bmc_attr['field_pos'])-1 if 'delimiter' in bmc_attr.keys(): delim = str(bmc_attr['delimiter']).strip() else: delim = None if not bmc_cmd in bmc_cache: self.populate_bmc_cache_db(bmc_attr) else: now = time.time() if (int(now - bmc_cache[bmc_cmd]['time']) > bmc_db_update_time): self.populate_bmc_cache_db(bmc_attr) try: data = bmc_cache[bmc_cmd][field_name] value = data.split(delim)[field_pos].strip() except Exception as e: pass if 'mult' in bmc_attr.keys() and not value.isalpha(): if value.isalpha(): value = 0.0 value = float(value) * float(bmc_attr['mult']) return str(value) def raw_ipmi_get_request(self, bmc_attr): value = 'N/A' cmd = bmc_attr['bmc_cmd'] + " 2>/dev/null" if bmc_attr['type'] == 'raw': try: value = subprocess.check_output(cmd, shell=True).strip() except Exception as e: pass if value != 'N/A': value = str(int(value, 16)) return value if bmc_attr['type'] == 'mask': mask = int(bmc_attr['mask'].encode('utf-8'), 16) try: value = subprocess.check_output(cmd, shell=True).strip() except Exception as e: pass # value should either be '1' or '0' if value != 'N/A': value = '1' if bool(int(value, 16) & mask) else '0' return value if bmc_attr['type'] == 'ascii': try: value = subprocess.check_output(cmd, shell=True) except Exception as e: pass if value != 'N/A': tmp = ''.join(chr(int(i, 16)) for i in value.split()) tmp = "".join(i for i in unicode(tmp) if unicodedata.category(i)[0] != "C") value = str(tmp) return (value) return value def bmc_get_cmd(self, bmc_attr): if int(bmc_attr['raw']) == 1: value = self.raw_ipmi_get_request(bmc_attr) else: value = self.non_raw_ipmi_get_request(bmc_attr) return (value) def non_raw_ipmi_set_request(self, bmc_attr, val): value = 'N/A' # TODO: Implement it return value def raw_ipmi_set_request(self, bmc_attr, val): value = 'N/A' # TODO: Implement this return value def bmc_set_cmd(self, bmc_attr, val): if int(bmc_attr['raw']) == 1: value = self.raw_ipmi_set_request(bmc_attr, val) else: value = self.non_raw_ipmi_set_request(bmc_attr, val) return (value) # bmc-based attr: return attr obj # non-bmc-based attr: return empty obj def check_bmc_based_attr(self, device_name, attr_name): if device_name in self.data.keys(): if "bmc" in self.data[device_name].keys() and 'ipmitool' in self.data[device_name]['bmc'].keys(): attr_list = self.data[device_name]['bmc']['ipmitool']['attr_list'] for attr in attr_list: if attr['attr_name'].strip() == attr_name.strip(): return attr # Required attr_name is not supported in BMC object return {} return None def get_attr_name_output(self, device_name, attr_name): bmc_attr = self.check_bmc_based_attr(device_name, attr_name) output = {"mode": "", "status": ""} if bmc_attr is not None: if bmc_attr == {}: return {} output['mode'] = "bmc" output['status'] = self.bmc_get_cmd(bmc_attr) else: output['mode'] = "i2c" node = self.get_path(device_name, attr_name) if node is None: return {} try: with open(node, 'r') as f: output['status'] = f.read() except IOError: return {} return output def set_attr_name_output(self, device_name, attr_name, val): bmc_attr = self.check_bmc_based_attr(device_name, attr_name) output = {"mode": "", "status": ""} if bmc_attr is not None: if bmc_attr == {}: return {} output['mode'] = "bmc" output['status'] = False # No set operation allowed for BMC attributes as they are handled by BMC itself else: output['mode'] = "i2c" node = self.get_path(device_name, attr_name) if node is None: return {} try: with open(node, 'w') as f: f.write(str(val)) except IOError: return {} output['status'] = True return output ################################################################################################################### def main(): parser = argparse.ArgumentParser() parser.add_argument("--create", action='store_true', help="create the I2C topology") parser.add_argument("--sysfs", action='store', nargs="+", help="show access-attributes sysfs for the I2C topology") parser.add_argument("--dsysfs", action='store', nargs="+", help="show data-attributes sysfs for the I2C topology") parser.add_argument("--delete", action='store_true', help="Remove all the created I2C clients from topology") parser.add_argument("--validate", action='store', help="Validate the device specific attribute data elements") parser.add_argument("--schema", action='store', nargs="+", help="Schema Validation") parser.add_argument("--modules", action='store', nargs="+", help="Loaded modules validation") args = parser.parse_args() # Create the object try: pddf_obj = PddfParse() except Exception as e: print("%s" % str(e)) sys.exit() if args.create: pddf_obj.create_pddf_devices() if args.sysfs: if args.sysfs[0] == 'all': pddf_obj.populate_pddf_sysfsobj() if args.sysfs[0] == 'print': pddf_obj.populate_pddf_sysfsobj() pddf_obj.dump_sysfs_obj(pddf_obj.sysfs_obj, args.sysfs[1]) if args.sysfs[0] == 'validate': pddf_obj.populate_pddf_sysfsobj() pddf_obj.validate_sysfs_creation(pddf_obj.sysfs_obj, args.sysfs[1]) if args.sysfs[0] == 'verify': pddf_obj.verify_sysfs_data(args.sysfs[1]) if args.dsysfs: if args.dsysfs[0] == 'validate': pddf_obj.dev_parse(pddf_obj.data['SYSTEM'], {"cmd": "show_attr", "target": "all", "attr": "all"}) if 'SYSSTATUS' in pddf_obj.data: pddf_obj.dev_parse(pddf_obj.data['SYSSTATUS'], {"cmd": "show_attr", "target": "all", "attr": "all"}) pddf_obj.validate_dsysfs_creation(pddf_obj.data_sysfs_obj, args.dsysfs[1]) elif args.dsysfs[0] == 'print': pddf_obj.dev_parse(pddf_obj.data['SYSTEM'], {"cmd": "show_attr", "target": "all", "attr": "all"}) if 'SYSSTATUS' in pddf_obj.data: pddf_obj.dev_parse(pddf_obj.data['SYSSTATUS'], {"cmd": "show_attr", "target": "all", "attr": "all"}) pddf_obj.dump_sysfs_obj(pddf_obj.data_sysfs_obj, args.dsysfs[1]) elif args.dsysfs[0] == 'all': ret = pddf_obj.dev_parse(pddf_obj.data['SYSTEM'], {"cmd": "show_attr", "target": "all", "attr": "all"}) if 'SYSSTATUS' in pddf_obj.data: ret += pddf_obj.dev_parse(pddf_obj.data['SYSSTATUS'], {"cmd": "show_attr", "target": "all", "attr": "all"}) pddf_obj.dump_sysfs_obj(pddf_obj.data_sysfs_obj, 'all') else: val = pddf_obj.dev_parse(pddf_obj.data[args.dsysfs[0]], {"cmd": "show_attr", "target": args.dsysfs[0], "attr": args.dsysfs[1]}) for i in val: print(i) if args.delete: pddf_obj.delete_pddf_devices() if args.validate: if args.validate[0] == 'all': pddf_obj.validate_pddf_devices(args.validate[1:]) else: pass if args.schema: pddf_obj.schema_validation(args.schema[0]) if args.modules: pddf_obj.modules_validation(args.modules[0]) if __name__ == "__main__": main()