[device/marvell] Mitigation for security vulnerability (#11876)
#### Why I did it `os` and `commands` modules are not secure against maliciously constructed input `getstatusoutput` is detected without a static string, uses `shell=True` #### How I did it Eliminate the use of `os` and `commands` Use `subprocess` instead
This commit is contained in:
parent
6f2ddc5f49
commit
0bd3be32e6
@ -1,17 +1,13 @@
|
|||||||
try:
|
try:
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import sys
|
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
from sonic_sfp.sfputilbase import SfpUtilBase
|
from sonic_sfp.sfputilbase import SfpUtilBase
|
||||||
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ImportError(str(e) + "- required module not found")
|
raise ImportError(str(e) + "- required module not found")
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -31,9 +27,10 @@ class SfpUtil(SfpUtilBase):
|
|||||||
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
os.system("modprobe i2c-dev")
|
subprocess.call(["modprobe", "i2c-dev"])
|
||||||
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
||||||
os.system("echo optoe2 0x50 > /sys/bus/i2c/devices/i2c-0/new_device")
|
with open("/sys/bus/i2c/devices/i2c-0/new_device", 'w') as file:
|
||||||
|
file.write("optoe2 0x50")
|
||||||
|
|
||||||
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
||||||
# for x in range(self.port _start, self.port_end +1):
|
# for x in range(self.port _start, self.port_end +1):
|
||||||
@ -74,8 +71,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
def i2c_get(self, device_addr, offset):
|
def i2c_get(self, device_addr, offset):
|
||||||
status = 0
|
status = 0
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
x = "i2cget -y 0 " + hex(device_addr) + " " + hex(offset)
|
x = ["i2cget", "-y", "0", hex(device_addr), hex(offset)]
|
||||||
cmdstatus, status = commands.getstatusoutput(x)
|
cmdstatus, status = getstatusoutput_noshell(x)
|
||||||
if cmdstatus != 0:
|
if cmdstatus != 0:
|
||||||
return cmdstatus
|
return cmdstatus
|
||||||
status = int(status, 16)
|
status = int(status, 16)
|
||||||
@ -86,8 +83,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
|
|
||||||
def i2c_set(self, device_addr, offset, value):
|
def i2c_set(self, device_addr, offset, value):
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmd = "i2cset -y 0 " + hex(device_addr) + " " + hex(offset) + " " + hex(value)
|
cmd = ["i2cset", "-y", "0", hex(device_addr), hex(offset), hex(value)]
|
||||||
os.system(cmd)
|
subprocess.call(cmd)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
bus.write_byte_data(device_addr, offset, value)
|
bus.write_byte_data(device_addr, offset, value)
|
||||||
|
@ -1,17 +1,13 @@
|
|||||||
try:
|
try:
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import sys
|
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
from sonic_sfp.sfputilbase import SfpUtilBase
|
from sonic_sfp.sfputilbase import SfpUtilBase
|
||||||
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ImportError(str(e) + "- required module not found")
|
raise ImportError(str(e) + "- required module not found")
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -31,9 +27,10 @@ class SfpUtil(SfpUtilBase):
|
|||||||
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
os.system("modprobe i2c-dev")
|
subprocess.call(["modprobe", "i2c-dev"])
|
||||||
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
||||||
os.system("echo optoe2 0x50 > /sys/bus/i2c/devices/i2c-0/new_device")
|
with open("/sys/bus/i2c/devices/i2c-0/new_device", 'w') as file:
|
||||||
|
file.write("optoe2 0x50")
|
||||||
|
|
||||||
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
||||||
# for x in range(self.port _start, self.port_end +1):
|
# for x in range(self.port _start, self.port_end +1):
|
||||||
@ -74,8 +71,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
def i2c_get(self, device_addr, offset):
|
def i2c_get(self, device_addr, offset):
|
||||||
status = 0
|
status = 0
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
x = "i2cget -y 0 " + hex(device_addr) + " " + hex(offset)
|
x = ["i2cget", "-y", "0", hex(device_addr), hex(offset)]
|
||||||
cmdstatus, status = commands.getstatusoutput(x)
|
cmdstatus, status = getstatusoutput_noshell(x)
|
||||||
if cmdstatus != 0:
|
if cmdstatus != 0:
|
||||||
return cmdstatus
|
return cmdstatus
|
||||||
status = int(status, 16)
|
status = int(status, 16)
|
||||||
@ -86,8 +83,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
|
|
||||||
def i2c_set(self, device_addr, offset, value):
|
def i2c_set(self, device_addr, offset, value):
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmd = "i2cset -y 0 " + hex(device_addr) + " " + hex(offset) + " " + hex(value)
|
cmd = ["i2cset", "-y", "0", hex(device_addr), hex(offset), hex(value)]
|
||||||
os.system(cmd)
|
subprocess.call(cmd)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
bus.write_byte_data(device_addr, offset, value)
|
bus.write_byte_data(device_addr, offset, value)
|
||||||
|
@ -1,9 +1,4 @@
|
|||||||
import sys
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
import os.path
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
try:
|
try:
|
||||||
@ -32,8 +27,7 @@ class PsuUtil(PsuBase):
|
|||||||
if index is None:
|
if index is None:
|
||||||
return False
|
return False
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmdstatus, psustatus = commands.getstatusoutput(
|
cmdstatus, psustatus = getstatusoutput_noshell(["i2cget", "-y", "0", "0x41", "0xa"])
|
||||||
'i2cget -y 0 0x41 0xa') # need to verify the cpld register logic
|
|
||||||
psustatus = int(psustatus, 16)
|
psustatus = int(psustatus, 16)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
@ -56,8 +50,7 @@ class PsuUtil(PsuBase):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmdstatus, psustatus = commands.getstatusoutput(
|
cmdstatus, psustatus = getstatusoutput_noshell(["i2cget", "-y", "0", "0x41", "0xa"])
|
||||||
'i2cget -y 0 0x41 0xa') # need to verify the cpld register logic
|
|
||||||
psustatus = int(psustatus, 16)
|
psustatus = int(psustatus, 16)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
|
@ -2,17 +2,13 @@ try:
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
import sys
|
|
||||||
import glob
|
import glob
|
||||||
|
import subprocess
|
||||||
from sonic_sfp.sfputilbase import SfpUtilBase
|
from sonic_sfp.sfputilbase import SfpUtilBase
|
||||||
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ImportError(str(e) + "- required module not found")
|
raise ImportError(str(e) + "- required module not found")
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -43,7 +39,7 @@ class SfpUtil(SfpUtilBase):
|
|||||||
|
|
||||||
# Enable optical SFP Tx
|
# Enable optical SFP Tx
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
os.system("i2cset -y -m 0x0f 0 0x41 0x5 0x00")
|
subprocess.call(["i2cset", "-y", "-m", "0x0f", "0", "0x41", "0x5", "0x00"])
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
DEVICE_ADDRESS = 0x41
|
DEVICE_ADDRESS = 0x41
|
||||||
@ -66,8 +62,9 @@ class SfpUtil(SfpUtilBase):
|
|||||||
port_eeprom_path = eeprom_path.format(self.port_to_i2c_mapping[x])
|
port_eeprom_path = eeprom_path.format(self.port_to_i2c_mapping[x])
|
||||||
#print port_eeprom_path
|
#print port_eeprom_path
|
||||||
if not os.path.exists(port_eeprom_path):
|
if not os.path.exists(port_eeprom_path):
|
||||||
bus_dev_path = bus_path.format(self.port_to_i2c_mapping[x])
|
bus_dev_path = bus_path.format(self.port_to_i2c_mapping[x]) + "/new_device"
|
||||||
os.system("echo optoe2 0x50 > " + bus_dev_path + "/new_device")
|
with open(bus_dev_path, 'w') as f:
|
||||||
|
f.write("optoe2 0x50")
|
||||||
self.port_to_eeprom_mapping[x] = port_eeprom_path
|
self.port_to_eeprom_mapping[x] = port_eeprom_path
|
||||||
self._port_to_eeprom_mapping[x] = port_eeprom_path
|
self._port_to_eeprom_mapping[x] = port_eeprom_path
|
||||||
SfpUtilBase.__init__(self)
|
SfpUtilBase.__init__(self)
|
||||||
@ -113,8 +110,7 @@ class SfpUtil(SfpUtilBase):
|
|||||||
pos = [1, 2, 4, 8]
|
pos = [1, 2, 4, 8]
|
||||||
bit_pos = pos[prt]
|
bit_pos = pos[prt]
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmdstatus, sfpstatus = commands.getstatusoutput(
|
cmdstatus, sfpstatus = getstatusoutput_noshell(['i2cget', '-y', '0', '0x41', '0x3'])
|
||||||
'i2cget -y 0 0x41 0x3') # need to verify the cpld register logic
|
|
||||||
sfpstatus = int(sfpstatus, 16)
|
sfpstatus = int(sfpstatus, 16)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
|
@ -1,17 +1,13 @@
|
|||||||
try:
|
try:
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import sys
|
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
from sonic_sfp.sfputilbase import SfpUtilBase
|
from sonic_sfp.sfputilbase import SfpUtilBase
|
||||||
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ImportError(str(e) + "- required module not found")
|
raise ImportError(str(e) + "- required module not found")
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -31,9 +27,10 @@ class SfpUtil(SfpUtilBase):
|
|||||||
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
os.system("modprobe i2c-dev")
|
subprocess.call(["modprobe", "i2c-dev"])
|
||||||
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
||||||
os.system("echo optoe2 0x50 > /sys/bus/i2c/devices/i2c-0/new_device")
|
with open("/sys/bus/i2c/devices/i2c-0/new_device", 'w') as file:
|
||||||
|
file.write("optoe2 0x50")
|
||||||
|
|
||||||
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
||||||
# for x in range(self.port _start, self.port_end +1):
|
# for x in range(self.port _start, self.port_end +1):
|
||||||
@ -74,8 +71,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
def i2c_get(self, device_addr, offset):
|
def i2c_get(self, device_addr, offset):
|
||||||
status = 0
|
status = 0
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
x = "i2cget -y 0 " + hex(device_addr) + " " + hex(offset)
|
x = ["i2cget", "-y", "0", hex(device_addr), hex(offset)]
|
||||||
cmdstatus, status = commands.getstatusoutput(x)
|
cmdstatus, status = getstatusoutput_noshell(x)
|
||||||
if cmdstatus != 0:
|
if cmdstatus != 0:
|
||||||
return cmdstatus
|
return cmdstatus
|
||||||
status = int(status, 16)
|
status = int(status, 16)
|
||||||
@ -86,8 +83,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
|
|
||||||
def i2c_set(self, device_addr, offset, value):
|
def i2c_set(self, device_addr, offset, value):
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmd = "i2cset -y 0 " + hex(device_addr) + " " + hex(offset) + " " + hex(value)
|
cmd = ["i2cset", "-y", "0", hex(device_addr), hex(offset), hex(value)]
|
||||||
os.system(cmd)
|
subprocess.call(cmd)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
bus.write_byte_data(device_addr, offset, value)
|
bus.write_byte_data(device_addr, offset, value)
|
||||||
|
@ -1,17 +1,13 @@
|
|||||||
try:
|
try:
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import sys
|
|
||||||
import re
|
import re
|
||||||
|
import subprocess
|
||||||
from sonic_sfp.sfputilbase import SfpUtilBase
|
from sonic_sfp.sfputilbase import SfpUtilBase
|
||||||
|
from sonic_py_common.general import getstatusoutput_noshell
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
raise ImportError(str(e) + "- required module not found")
|
raise ImportError(str(e) + "- required module not found")
|
||||||
|
|
||||||
if sys.version_info[0] < 3:
|
|
||||||
import commands
|
|
||||||
else:
|
|
||||||
import subprocess as commands
|
|
||||||
|
|
||||||
smbus_present = 1
|
smbus_present = 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -31,9 +27,10 @@ class SfpUtil(SfpUtilBase):
|
|||||||
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
_qsfp_ports = list(range(_port_start, ports_in_block + 1))
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
os.system("modprobe i2c-dev")
|
subprocess.call(["modprobe", "i2c-dev"])
|
||||||
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
if not os.path.exists("/sys/bus/i2c/devices/0-0050"):
|
||||||
os.system("echo optoe2 0x50 > /sys/bus/i2c/devices/i2c-0/new_device")
|
with open("/sys/bus/i2c/devices/i2c-0/new_device", 'w') as file:
|
||||||
|
file.write("optoe2 0x50")
|
||||||
|
|
||||||
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
eeprom_path = '/sys/bus/i2c/devices/0-0050/eeprom'
|
||||||
# for x in range(self.port _start, self.port_end +1):
|
# for x in range(self.port _start, self.port_end +1):
|
||||||
@ -74,8 +71,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
def i2c_get(self, device_addr, offset):
|
def i2c_get(self, device_addr, offset):
|
||||||
status = 0
|
status = 0
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
x = "i2cget -y 0 " + hex(device_addr) + " " + hex(offset)
|
x = ["i2cget", "-y", "0", hex(device_addr), hex(offset)]
|
||||||
cmdstatus, status = commands.getstatusoutput(x)
|
cmdstatus, status = getstatusoutput_noshell(x)
|
||||||
if cmdstatus != 0:
|
if cmdstatus != 0:
|
||||||
return cmdstatus
|
return cmdstatus
|
||||||
status = int(status, 16)
|
status = int(status, 16)
|
||||||
@ -86,8 +83,8 @@ class SfpUtil(SfpUtilBase):
|
|||||||
|
|
||||||
def i2c_set(self, device_addr, offset, value):
|
def i2c_set(self, device_addr, offset, value):
|
||||||
if smbus_present == 0:
|
if smbus_present == 0:
|
||||||
cmd = "i2cset -y 0 " + hex(device_addr) + " " + hex(offset) + " " + hex(value)
|
cmd = ["i2cset", "-y", "0", hex(device_addr), hex(offset), hex(value)]
|
||||||
os.system(cmd)
|
subprocess.call(cmd)
|
||||||
else:
|
else:
|
||||||
bus = smbus.SMBus(0)
|
bus = smbus.SMBus(0)
|
||||||
bus.write_byte_data(device_addr, offset, value)
|
bus.write_byte_data(device_addr, offset, value)
|
||||||
|
Loading…
Reference in New Issue
Block a user