sonic-buildimage/device/celestica/x86_64-cel_e1031-r0/sonic_platform/event.py
Wirut Getbamrung 4bf873bf42
[device/celestica]: Fix failed test cases of Haliburton platform API (#7579)
- Why I did it
To fix failed test cases of Haliburton platform APIs that found on platform_tests script
- How I did it
Add device/celestica/x86_64-cel_e1031-r0/platform.json
Update functions to support python3.7
Add more functions follow latest sonic_platform_base
Fix the bug
- How to verify it
Run platform_tests script

Signed-off-by: Wirut Getbamrung [wgetbumr@celestica.com]
2021-07-23 06:18:39 -07:00

64 lines
2.1 KiB
Python

try:
import time
import select
from .common import Common
from sonic_py_common.logger import Logger
except ImportError as e:
raise ImportError(repr(e) + " - required module not found")
class SfpEvent:
''' Listen to insert/remove sfp events '''
SFP_NUM_START = 49
DELAY = 0.05
INT_PATH = '/sys/devices/platform/e1031.smc/SFP/modabs_int'
GPIO_SUS7 = '/sys/devices/platform/hlx-ich.0/sci_int_gpio_sus7'
def __init__(self, sfp_list):
self._api_common = Common()
self._sfp_list = sfp_list
self._logger = Logger()
# clear interrupt
self._api_common.read_one_line_file(self.INT_PATH)
def get_sfp_event(self, timeout):
epoll = select.epoll()
port_dict = {}
timeout_sec = timeout/1000
try:
# We get notified when there is an SCI interrupt from GPIO SUS7
fd = open(self.GPIO_SUS7, "r")
fd.read()
epoll.register(fd.fileno(), select.EPOLLIN & select.EPOLLET)
events = epoll.poll(timeout=timeout_sec if timeout != 0 else -1)
if events:
# Read the QSFP ABS interrupt & status registers
port_changes = self._api_common.read_one_line_file(
self.INT_PATH)
changes = int(port_changes, 16)
for sfp in self._sfp_list:
if sfp.port_num < self.SFP_NUM_START:
continue
change = (changes >> sfp.port_num-self.SFP_NUM_START) & 1
if change == 1:
time.sleep(self.DELAY)
port_status = sfp.get_presence()
port_dict[str(sfp.port_num)
] = '1' if port_status else '0'
return port_dict
except Exception as e:
self._logger.log_error("Failed to detect SfpEvent - " + repr(e))
return False
finally:
fd.close()
epoll.close()
return False