51a1eb112b
Signed-off-by: maipbui <maibui@microsoft.com> Dependency: [PR (#12065)](https://github.com/sonic-net/sonic-buildimage/pull/12065) needs to merge first. #### Why I did it 1. `eval()` - not secure against maliciously constructed input, can be dangerous if used to evaluate dynamic content. This may be a code injection vulnerability. 2. `subprocess()` - when using with `shell=True` is dangerous. Using subprocess function without a static string can lead to command injection. 3. `os` - not secure against maliciously constructed input and dangerous if used to evaluate dynamic content. 4. `is` operator - string comparison should not be used with reference equality. 5. `globals()` - extremely dangerous because it may allow an attacker to execute arbitrary code on the system #### How I did it 1. `eval()` - use `literal_eval()` 2. `subprocess()` - use `shell=False` instead. use an array string. Ref: [https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation](https://semgrep.dev/docs/cheat-sheets/python-command-injection/#mitigation) 3. `os` - use with `subprocess` 4. `is` - replace by `==` operator for value equality 5. `globals()` - avoid the use of globals()
112 lines
3.2 KiB
Python
112 lines
3.2 KiB
Python
import os
|
|
import struct
|
|
import subprocess
|
|
from mmap import *
|
|
from sonic_py_common.general import check_output_pipe
|
|
|
|
HOST_CHK_CMD = ["docker"]
|
|
EMPTY_STRING = ""
|
|
|
|
|
|
class APIHelper():
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def is_host(self):
|
|
try:
|
|
subprocess.call(HOST_CHK_CMD, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
except FileNotFoundError:
|
|
return False
|
|
return True
|
|
|
|
def pci_get_value(self, resource, offset):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
fd = os.open(resource, os.O_RDWR)
|
|
mm = mmap(fd, 0)
|
|
mm.seek(int(offset))
|
|
read_data_stream = mm.read(4)
|
|
result = struct.unpack('I', read_data_stream)
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def run_command(self, cmd1_args, cmd2_args):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
result = check_output_pipe(cmd1_args, cmd2_args)
|
|
except subprocess.CalledProcessError:
|
|
status = False
|
|
return status, result
|
|
|
|
def run_interactive_command(self, cmd):
|
|
try:
|
|
subprocess.call(cmd)
|
|
except:
|
|
return False
|
|
return True
|
|
|
|
def read_txt_file(self, file_path):
|
|
try:
|
|
with open(file_path, 'r') as fd:
|
|
data = fd.read()
|
|
return data.strip()
|
|
except IOError:
|
|
pass
|
|
return None
|
|
|
|
def ipmi_raw(self, netfn, cmd):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
cmd = ["ipmitool", "raw", str(netfn), str(cmd)]
|
|
p = subprocess.Popen(
|
|
cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
return status, result
|
|
|
|
def ipmi_fru_id(self, id, key=None):
|
|
status = True
|
|
result = ""
|
|
cmd1_args = ["ipmitool", "fru", "print", str(id)]
|
|
if not key:
|
|
try:
|
|
p = subprocess.Popen(
|
|
cmd1_args, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
else:
|
|
cmd2_args = ["grep", str(key)]
|
|
status, result = self.run_command(cmd1_args, cmd2_args)
|
|
return status, result
|
|
|
|
def ipmi_set_ss_thres(self, id, threshold_key, value):
|
|
status = True
|
|
result = ""
|
|
try:
|
|
cmd = ["ipmitool", "sensor", "thresh", str(id), str(threshold_key), str(value)]
|
|
p = subprocess.Popen(
|
|
cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
raw_data, err = p.communicate()
|
|
if err == '':
|
|
result = raw_data.strip()
|
|
else:
|
|
status = False
|
|
except:
|
|
status = False
|
|
return status, result
|