sonic-buildimage/files/image_config/backend_acl/backend_acl.py
Neetha John 0aacc4531a [storage_backend] Add backend acl service (#14229)
Why I did it
This PR addresses the issue mentioned above by loading the acl config as a service on a storage backend device

How I did it
The new acl service is a oneshot service which will start after swss and does some retries to ensure that the SWITCH_CAPABILITY info is present before attempting to load the acl rules. The service is also bound to sonic targets which ensures that it gets restarted during minigraph reload and config reload

How to verify it
Build an image with the following changes and did the following tests

Verified that acl is loaded successfully on a storage backend device after a switch boot up
Verified that acl is loaded successfully on a storage backend ToR after minigraph load and config reload
Verified that acl is not loaded if the device is not a storage backend ToR or the device does not have a DATAACL table

Signed-off-by: Neetha John <nejo@microsoft.com>
2023-03-19 22:32:22 +08:00

95 lines
2.9 KiB
Python
Executable File

#!/usr/bin/env python
import os
import subprocess
import syslog
import time
from swsscommon.swsscommon import SonicV2Connector
SYSLOG_IDENTIFIER = os.path.basename(__file__)
SONIC_CFGGEN_PATH = '/usr/local/bin/sonic-cfggen'
def log_info(msg):
syslog.openlog(SYSLOG_IDENTIFIER)
syslog.syslog(syslog.LOG_INFO, msg)
syslog.closelog()
def run_command(cmd, return_cmd=False):
log_info("executing cmd = {}".format(cmd))
proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE)
out, err = proc.communicate()
if return_cmd:
if err:
return "Unknown"
if len(out) > 0:
return out.strip().decode('utf-8')
def _get_device_type():
"""
Get device type
"""
device_type = run_command([SONIC_CFGGEN_PATH, '-m', '-v', 'DEVICE_METADATA.localhost.type'], return_cmd=True)
return device_type
def _is_storage_device():
"""
Check if the device is a storage device or not
"""
storage_device = run_command([SONIC_CFGGEN_PATH, '-d', '-v', 'DEVICE_METADATA.localhost.storage_device'], return_cmd=True)
return storage_device == "true"
def _is_acl_table_present():
"""
Check if acl table exists
"""
acl_table = run_command([SONIC_CFGGEN_PATH, '-d', '-v', 'ACL_TABLE.DATAACL'], return_cmd=True)
return (acl_table != "Unknown" and bool(acl_table))
def _is_switch_table_present():
state_db = SonicV2Connector(host='127.0.0.1')
state_db.connect(state_db.STATE_DB, False)
table_present = False
wait_time = 0
TIMEOUT = 120
STEP = 10
while wait_time < TIMEOUT:
if state_db.exists(state_db.STATE_DB, 'SWITCH_CAPABILITY|switch'):
table_present = True
break
time.sleep(STEP)
wait_time += STEP
if not table_present:
log_info("Switch table not present")
return table_present
def load_backend_acl(device_type):
"""
Load acl on backend storage device
"""
BACKEND_ACL_TEMPLATE_FILE = os.path.join('/', "usr", "share", "sonic", "templates", "backend_acl.j2")
BACKEND_ACL_FILE = os.path.join('/', "etc", "sonic", "backend_acl.json")
# this acl needs to be loaded only on a storage backend ToR. acl load will fail if the switch table isn't present
if _is_storage_device() and _is_acl_table_present() and _is_switch_table_present():
if os.path.isfile(BACKEND_ACL_TEMPLATE_FILE):
run_command(['sudo', SONIC_CFGGEN_PATH, '-d', '-t', '{},{}'.format(BACKEND_ACL_TEMPLATE_FILE, BACKEND_ACL_FILE)])
if os.path.isfile(BACKEND_ACL_FILE):
run_command(['acl-loader', 'update', 'incremental', BACKEND_ACL_FILE])
else:
log_info("Skipping backend acl load - conditions not met")
def main():
device_type = _get_device_type()
if device_type != "BackEndToRRouter":
log_info("Skipping backend acl load on unsupported device type: {}".format(device_type))
return
load_backend_acl(device_type)
if __name__ == "__main__":
main()