[device/delta] Mitigation for command injection vulnerability (#11865)
#### Why I did it `os` execution functions are not secure against maliciously constructed input. #### How I did it Use `subprocess` module
This commit is contained in:
parent
fad4034000
commit
ea101a90d5
@ -1,4 +1,4 @@
|
|||||||
import os.path
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from sonic_psu.psu_base import PsuBase
|
from sonic_psu.psu_base import PsuBase
|
||||||
@ -14,7 +14,7 @@ class PsuUtil(PsuBase):
|
|||||||
|
|
||||||
self.psu_path = "/sys/bus/i2c/devices/6-00{}/"
|
self.psu_path = "/sys/bus/i2c/devices/6-00{}/"
|
||||||
self.psu_oper_status = "in1_input"
|
self.psu_oper_status = "in1_input"
|
||||||
self.psu_presence = "i2cget -y 6 0x{} 0x00"
|
self.psu_presence = ["i2cget", "-y", "6", "", "0x00"]
|
||||||
|
|
||||||
def get_num_psus(self):
|
def get_num_psus(self):
|
||||||
"""
|
"""
|
||||||
@ -44,9 +44,10 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
Base_bus_number = 49
|
Base_bus_number = 49
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_presence[3] = "0x" + str(index + Base_bus_number)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_presence.format(index + Base_bus_number) + "> /dev/null 2>&1")
|
p = subprocess.Popen(self.psu_presence, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
if p.readline() != None:
|
if p.stdout.readline() != None:
|
||||||
status = 1
|
status = 1
|
||||||
p.close()
|
p.close()
|
||||||
except IOError:
|
except IOError:
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import os.path
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from sonic_psu.psu_base import PsuBase
|
from sonic_psu.psu_base import PsuBase
|
||||||
@ -14,7 +14,7 @@ class PsuUtil(PsuBase):
|
|||||||
|
|
||||||
self.psu_path = "/sys/bus/i2c/devices/{}-0058/"
|
self.psu_path = "/sys/bus/i2c/devices/{}-0058/"
|
||||||
self.psu_oper_status = "in1_input"
|
self.psu_oper_status = "in1_input"
|
||||||
self.psu_presence = "i2cget -y {} 0x50 0x00"
|
self.psu_presence = ["i2cget", "-y", "", "0x50", "0x00"]
|
||||||
|
|
||||||
def get_num_psus(self):
|
def get_num_psus(self):
|
||||||
"""
|
"""
|
||||||
@ -45,9 +45,10 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
Base_bus_number = 39
|
Base_bus_number = 39
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_presence[2] = str(index + Base_bus_number)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_presence.format(index + Base_bus_number) + "> /dev/null 2>&1")
|
p = subprocess.Popen(self.psu_presence, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
if p.readline() != None:
|
if p.stdout.readline() != None:
|
||||||
status = 1
|
status = 1
|
||||||
p.close()
|
p.close()
|
||||||
except IOError:
|
except IOError:
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
# provides the PSUs status which are available in the platform
|
# provides the PSUs status which are available in the platform
|
||||||
#
|
#
|
||||||
|
|
||||||
import os.path
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -38,8 +37,9 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
status = 0
|
status = 0
|
||||||
try:
|
try:
|
||||||
p = os.popen("ipmitool raw 0x38 0x2 3 0x6a 0x3 1")
|
cmd = ["ipmitool", "raw", "0x38", "0x2", "3", "0x6a", "0x3", "1"]
|
||||||
content = p.readline().rstrip()
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
|
content = p.stdout.readline().rstrip()
|
||||||
reg_value = int(content, 16)
|
reg_value = int(content, 16)
|
||||||
if index == 1:
|
if index == 1:
|
||||||
mask = (1 << 6)
|
mask = (1 << 6)
|
||||||
@ -66,8 +66,9 @@ class PsuUtil(PsuBase):
|
|||||||
|
|
||||||
status = 0
|
status = 0
|
||||||
try:
|
try:
|
||||||
p = os.popen("ipmitool raw 0x38 0x2 3 0x6a 0x3 1")
|
cmd = ["ipmitool", "raw", "0x38", "0x2", "3", "0x6a", "0x3", "1"]
|
||||||
content = p.readline().rstrip()
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
|
content = p.stdout.readline().rstrip()
|
||||||
reg_value = int(content, 16)
|
reg_value = int(content, 16)
|
||||||
if index == 1:
|
if index == 1:
|
||||||
mask = (1 << 7)
|
mask = (1 << 7)
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
# provides the PSUs status which are available in the platform
|
# provides the PSUs status which are available in the platform
|
||||||
#
|
#
|
||||||
|
|
||||||
import os.path
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -17,8 +16,8 @@ class PsuUtil(PsuBase):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
PsuBase.__init__(self)
|
PsuBase.__init__(self)
|
||||||
self.psu_presence = "cat /sys/devices/platform/delta-ag9064-cpld.0/psu{}_scan"
|
self.psu_presence = "/sys/devices/platform/delta-ag9064-cpld.0/psu{}_scan"
|
||||||
self.psu_status = "cat /sys/devices/platform/delta-ag9064-swpld1.0/psu{}_pwr_ok"
|
self.psu_status = "/sys/devices/platform/delta-ag9064-swpld1.0/psu{}_pwr_ok"
|
||||||
|
|
||||||
def get_num_psus(self):
|
def get_num_psus(self):
|
||||||
"""
|
"""
|
||||||
@ -40,8 +39,9 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_status = self.psu_status.format(index)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_status.format(index))
|
p = open(self.psu_status, 'r')
|
||||||
content = p.readline().rstrip()
|
content = p.readline().rstrip()
|
||||||
reg_value = int(content)
|
reg_value = int(content)
|
||||||
if reg_value != 0:
|
if reg_value != 0:
|
||||||
@ -63,8 +63,9 @@ class PsuUtil(PsuBase):
|
|||||||
if index is None:
|
if index is None:
|
||||||
return False
|
return False
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_presence = self.psu_presence.format(index)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_presence.format(index))
|
p = open(self.psu_presence, 'r')
|
||||||
content = p.readline().rstrip()
|
content = p.readline().rstrip()
|
||||||
reg_value = int(content, 16)
|
reg_value = int(content, 16)
|
||||||
if reg_value != 0:
|
if reg_value != 0:
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import os.path
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from sonic_psu.psu_base import PsuBase
|
from sonic_psu.psu_base import PsuBase
|
||||||
@ -15,7 +15,7 @@ class PsuUtil(PsuBase):
|
|||||||
self.psu_path = "/sys/bus/i2c/devices/{}-0058/"
|
self.psu_path = "/sys/bus/i2c/devices/{}-0058/"
|
||||||
self.psu_oper_status = "in1_input"
|
self.psu_oper_status = "in1_input"
|
||||||
self.psu_oper_status2 = "in2_input"
|
self.psu_oper_status2 = "in2_input"
|
||||||
self.psu_presence = "i2cget -y {} 0x50 0x00"
|
self.psu_presence = ["i2cget", "-y", "", "0x50", "0x00"]
|
||||||
|
|
||||||
def get_num_psus(self):
|
def get_num_psus(self):
|
||||||
"""
|
"""
|
||||||
@ -50,9 +50,10 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
Base_bus_number = 0
|
Base_bus_number = 0
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_presence[2] = str(index + Base_bus_number)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_presence.format(index + Base_bus_number) + "> /dev/null 2>&1")
|
p = subprocess.Popen(self.psu_presence, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
if p.readline() != None:
|
if p.stdout.readline() != None:
|
||||||
status = 1
|
status = 1
|
||||||
p.close()
|
p.close()
|
||||||
except IOError:
|
except IOError:
|
||||||
|
@ -3,7 +3,6 @@
|
|||||||
# provides the PSUs status which are available in the platform
|
# provides the PSUs status which are available in the platform
|
||||||
#
|
#
|
||||||
|
|
||||||
import os.path
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -17,7 +16,7 @@ class PsuUtil(PsuBase):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
PsuBase.__init__(self)
|
PsuBase.__init__(self)
|
||||||
self.psu_status = "ipmitool raw 0x38 0x1 {} 0x50"
|
self.psu_status = ["ipmitool", "raw", "0x38", "0x1", "", "0x50"]
|
||||||
|
|
||||||
def get_num_psus(self):
|
def get_num_psus(self):
|
||||||
"""
|
"""
|
||||||
@ -39,9 +38,10 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
status = 0
|
status = 0
|
||||||
|
cmd = ["ipmitool", "raw", "0x38", "0x2", "7", "0x32", "0x28", "1"]
|
||||||
try:
|
try:
|
||||||
p = os.popen("ipmitool raw 0x38 0x2 7 0x32 0x28 1")
|
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
content = p.readline().rstrip()
|
content = p.stdout.readline().rstrip()
|
||||||
reg_value = int(content, 16)
|
reg_value = int(content, 16)
|
||||||
mask = (1 << (8 - index))
|
mask = (1 << (8 - index))
|
||||||
if reg_value & mask == 0:
|
if reg_value & mask == 0:
|
||||||
@ -63,9 +63,10 @@ class PsuUtil(PsuBase):
|
|||||||
if index is None:
|
if index is None:
|
||||||
return False
|
return False
|
||||||
status = 0
|
status = 0
|
||||||
|
self.psu_status[4] = str(index-1)
|
||||||
try:
|
try:
|
||||||
p = os.popen(self.psu_status.format(index - 1))
|
p = subprocess.Popen(self.psu_status, stdout=subprocess.PIPE, universal_newlines=True)
|
||||||
content = p.readline().rstrip()
|
content = p.stdout.readline().rstrip()
|
||||||
reg_value = int(content, 16)
|
reg_value = int(content, 16)
|
||||||
if reg_value != 0:
|
if reg_value != 0:
|
||||||
return False
|
return False
|
||||||
|
Loading…
Reference in New Issue
Block a user