sonic-buildimage/platform/mellanox/integration-scripts/helper.py

210 lines
6.6 KiB
Python

#
# Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES.
# Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import glob
import re
MARK_ID = "###->"
MLNX_KFG_MARKER = "mellanox"
SDK_MARKER = "mellanox_sdk"
HW_MGMT_MARKER = "mellanox_hw_mgmt"
SLK_PATCH_LOC = "src/sonic-linux-kernel/patch/"
SLK_KCONFIG = SLK_PATCH_LOC + "kconfig-inclusions"
SLK_KCONFIG_EXCLUDE = SLK_PATCH_LOC + "kconfig-exclusions"
SLK_SERIES = SLK_PATCH_LOC + "series"
NON_UP_PATCH_DIR = "platform/mellanox/non-upstream-patches/"
NON_UP_PATCH_LOC = NON_UP_PATCH_DIR + "patches"
NON_UP_PATCH_DIFF = NON_UP_PATCH_DIR + "series.patch"
KCFG_HDR_RE = "\[(.*)\]"
KERNEL_BACKPORTS = "kernel_backports"
# kconfig_inclusion headers to consider
HDRS = ["common", "amd64"]
class FileHandler:
@staticmethod
def write_lines(path, lines, raw=False):
# Create the dir if it doesn't exist already
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
for line in lines:
if raw:
f.write(f"{line}")
else:
f.write(f"{line}\n")
@staticmethod
def read_raw(path):
# Read the data line by line into a list
data = []
with open(path) as im:
data = im.readlines()
return data
@staticmethod
def read_strip(path, as_string=False):
# Read the data line by line into a list and strip whitelines
data = FileHandler.read_raw(path)
data = [d.strip() for d in data]
if as_string:
return "\n".join(data)
return data
@staticmethod
def read_strip_minimal(path, as_string=False, ignore_start_with="#"):
# Read the data line by line into a list, strip spaces and ignore comments
data = FileHandler.read_raw(path)
filtered_data = []
for l in data:
l = l.strip()
if l and not l.startswith(ignore_start_with):
filtered_data.append(l)
if as_string:
return "\n".join(filtered_data)
return filtered_data
@staticmethod
def read_dir(path, ext="*") -> list:
return [os.path.basename(f) for f in glob.glob(os.path.join(path, ext))]
@staticmethod
def find_marker_indices(lines: list, marker=None) -> tuple:
i_start = -1
i_end = len(lines)
# print("TEST", marker, lines)
if marker:
for index, line in enumerate(lines):
# assumes one unique marker per file
# if multiple marker sections are present, reads the first one
if line.strip().startswith(MARK_ID):
if marker+"-start" in line:
i_start = index
elif marker+"-end" in line:
i_end = index
# print(i_start, i_end)
return (i_start, i_end)
@staticmethod
def read_kconfig_inclusion(path, marker=MLNX_KFG_MARKER):
lines = FileHandler.read_strip(path)
if not marker:
return lines
i_start, i_end = FileHandler.find_marker_indices(lines, marker)
if i_start < 0 or i_end >= len(lines):
print("-> WARNING No Marker Found")
return []
return lines[i_start+1:i_end]
@staticmethod
def write_lines_marker(path, writable_opts: list, marker=None):
# if marker is none, just write the opts into the file,
# otherwise write the data only b/w the marker
curr_data = FileHandler.read_raw(path)
i_start, i_end = FileHandler.find_marker_indices(curr_data, marker)
newline_writ_opts = [opt + "\n" for opt in writable_opts]
if i_start < 0 or i_end >= len(curr_data):
print("-> WARNING No Marker Found, writing data at the end of file")
curr_data.extend(["\n"])
curr_data.extend(newline_writ_opts)
else:
curr_data = curr_data[0:i_start+1] + newline_writ_opts + curr_data[i_end:]
print("-> INFO Written the following opts: \n{}".format("".join(FileHandler.read_raw(path))))
FileHandler.write_lines(path, curr_data, True)
@staticmethod
def read_kconfig_parser(path) -> dict:
# kconfig_inclusion output formatted to {"no_parent", "common":[,], "amd64": [,], "arm64": [,]}
lines = FileHandler.read_strip_minimal(path)
ret = dict({"no_parent":[]})
curr_hdr = ""
for line in lines:
match = re.search(KCFG_HDR_RE, line)
if match:
curr_hdr = match.group(1)
ret[curr_hdr] = []
elif curr_hdr in ret:
ret[curr_hdr].append(line)
else:
ret["no_parent"].append(line)
return ret
class KCFG:
@staticmethod
def parse_opt_str(opt: str) -> tuple:
if not opt.startswith("CONFIG"):
print("-> DEBUG: Malformed kconfig opt, {}".format(opt))
return ()
tmp = opt.split("=")
if len(tmp) != 2:
print("-> DEBUG: Malformed kconfig opt, {}".format(opt))
return ()
return (tmp[0], tmp[1])
@staticmethod
def parse_opts_strs(kcfg_sec: list) -> list(tuple()):
opts = [] # list of tuples (CONFIG_*, "m|y|n")
for kcfg in kcfg_sec:
tmp = KCFG.parse_opt_str(kcfg)
if tmp:
opts.append(tmp)
return opts
@staticmethod
def get_writable_opts(opts):
lines = []
for opt in opts:
lines.append("{}={}".format(opt[0], opt[1]))
return lines
class Action():
def __init__(self, args):
self.args = args
def perform(self):
pass
def write_user_out(self):
pass
def build_commit_description(changes):
if not changes:
return ""
content = "\n"
content = content + " ## Patch List\n"
for key, value in changes.items():
content = content + f"* {key} : {value}\n"
return content
def parse_id(id_):
if id_ and id_ != "N/A":
id_ = "https://github.com/torvalds/linux/commit/" + id_
if id_ == "N/A":
id_ = ""
return id_