[202012] Fix caclmgrd crash issue when applying scale cacl rules (#15630)

Cherry pick PR for https://github.com/sonic-net/sonic-host-services/pull/62

#### Why I did it
Fix the issue https://github.com/sonic-net/sonic-buildimage/issues/10883.

##### Work item tracking
- Microsoft ADO **(17795594)**:

#### How I did it
For performance reason, libswsscommon is not thread safe by design.
caclmgrd share config DB connection cross thread, so change to use new db connector in child thread.

#### How to verify it
Load scale ipv4/ipv6 rules and verify if caclmgrd is crashed
This commit is contained in:
Zhaohui Sun 2023-07-04 13:33:39 +08:00 committed by GitHub
parent d4515eecf8
commit 71a8a66894
No account linked to committer's email address
5 changed files with 1101 additions and 31 deletions

View File

@ -205,7 +205,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
tcp_flags_str = tcp_flags_str[:-1] tcp_flags_str = tcp_flags_str[:-1]
return tcp_flags_str return tcp_flags_str
def generate_block_ip2me_traffic_iptables_commands(self, namespace): def generate_block_ip2me_traffic_iptables_commands(self, namespace, config_db_connector):
INTERFACE_TABLE_NAME_LIST = [ INTERFACE_TABLE_NAME_LIST = [
"LOOPBACK_INTERFACE", "LOOPBACK_INTERFACE",
"MGMT_INTERFACE", "MGMT_INTERFACE",
@ -218,7 +218,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Add iptables rules to drop all packets destined for peer-to-peer interface IP addresses # Add iptables rules to drop all packets destined for peer-to-peer interface IP addresses
for iface_table_name in INTERFACE_TABLE_NAME_LIST: for iface_table_name in INTERFACE_TABLE_NAME_LIST:
iface_table = self.config_db_map[namespace].get_table(iface_table_name) iface_table = config_db_connector.get_table(iface_table_name)
if iface_table: if iface_table:
for key, _ in iface_table.items(): for key, _ in iface_table.items():
if not _ip_prefix_in_key(key): if not _ip_prefix_in_key(key):
@ -431,7 +431,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
self.log_info("Update DHCP chain: {}".format(insert_cmd)) self.log_info("Update DHCP chain: {}".format(insert_cmd))
def get_acl_rules_and_translate_to_iptables_commands(self, namespace): def get_acl_rules_and_translate_to_iptables_commands(self, namespace, config_db_connector):
""" """
Retrieves current ACL tables and rules from Config DB, translates Retrieves current ACL tables and rules from Config DB, translates
control plane ACLs into a list of iptables commands that can be run control plane ACLs into a list of iptables commands that can be run
@ -516,8 +516,8 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
iptables_cmds.append(self.iptables_cmd_ns_prefix[namespace] + "ip6tables -A INPUT -p tcp --sport 179 -j ACCEPT") iptables_cmds.append(self.iptables_cmd_ns_prefix[namespace] + "ip6tables -A INPUT -p tcp --sport 179 -j ACCEPT")
# Get current ACL tables and rules from Config DB # Get current ACL tables and rules from Config DB
self._tables_db_info = self.config_db_map[namespace].get_table(self.ACL_TABLE) self._tables_db_info = config_db_connector.get_table(self.ACL_TABLE)
self._rules_db_info = self.config_db_map[namespace].get_table(self.ACL_RULE) self._rules_db_info = config_db_connector.get_table(self.ACL_RULE)
num_ctrl_plane_acl_rules = 0 num_ctrl_plane_acl_rules = 0
@ -658,7 +658,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
service_to_source_ip_map.update({ acl_service:{ "ipv4":ipv4_src_ip_set, "ipv6":ipv6_src_ip_set } }) service_to_source_ip_map.update({ acl_service:{ "ipv4":ipv4_src_ip_set, "ipv6":ipv6_src_ip_set } })
# Add iptables commands to block ip2me traffic # Add iptables commands to block ip2me traffic
iptables_cmds += self.generate_block_ip2me_traffic_iptables_commands(namespace) iptables_cmds += self.generate_block_ip2me_traffic_iptables_commands(namespace, config_db_connector)
# Add iptables/ip6tables commands to allow all incoming packets with TTL of 0 or 1 # Add iptables/ip6tables commands to allow all incoming packets with TTL of 0 or 1
# This allows the device to respond to tools like tcptraceroute # This allows the device to respond to tools like tcptraceroute
@ -673,13 +673,13 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
return iptables_cmds, service_to_source_ip_map return iptables_cmds, service_to_source_ip_map
def update_control_plane_acls(self, namespace): def update_control_plane_acls(self, namespace, config_db_connector):
""" """
Convenience wrapper which retrieves current ACL tables and rules from Convenience wrapper which retrieves current ACL tables and rules from
Config DB, translates control plane ACLs into a list of iptables Config DB, translates control plane ACLs into a list of iptables
commands and runs them. commands and runs them.
""" """
iptables_cmds, service_to_source_ip_map = self.get_acl_rules_and_translate_to_iptables_commands(namespace) iptables_cmds, service_to_source_ip_map = self.get_acl_rules_and_translate_to_iptables_commands(namespace, config_db_connector)
self.log_info("Issuing the following iptables commands:") self.log_info("Issuing the following iptables commands:")
for cmd in iptables_cmds: for cmd in iptables_cmds:
self.log_info(" " + cmd) self.log_info(" " + cmd)
@ -714,30 +714,36 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
updates were received during the delay window, at which point it will update updates were received during the delay window, at which point it will update
iptables using the current ACL rules. iptables using the current ACL rules.
""" """
while True: try:
# Sleep for our delay interval # ConfigDBConnector is not multi thread safe. In child thread, we use another new DB connector.
time.sleep(self.UPDATE_DELAY_SECS) new_config_db_connector = swsscommon.ConfigDBConnector(use_unix_socket_path=True, namespace=namespace)
new_config_db_connector.connect()
while True:
# Sleep for our delay interval
time.sleep(self.UPDATE_DELAY_SECS)
with self.lock[namespace]: with self.lock[namespace]:
if self.num_changes[namespace] > num_changes: if self.num_changes[namespace] > num_changes:
# More ACL table changes occurred since this thread was spawned # More ACL table changes occurred since this thread was spawned
# spawn a new thread with the current number of changes # spawn a new thread with the current number of changes
new_changes = self.num_changes[namespace] - num_changes new_changes = self.num_changes[namespace] - num_changes
self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..." self.log_info("ACL config not stable for namespace '{}': {} changes detected in the past {} seconds. Skipping update ..."
.format(namespace, new_changes, self.UPDATE_DELAY_SECS)) .format(namespace, new_changes, self.UPDATE_DELAY_SECS))
num_changes = self.num_changes[namespace] num_changes = self.num_changes[namespace]
else:
if num_changes == self.num_changes[namespace] and num_changes > 0:
self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..."
.format(namespace, self.UPDATE_DELAY_SECS))
self.update_control_plane_acls(namespace)
else: else:
self.log_error("Error updating ACLs for namespace '{}'".format(namespace)) if num_changes == self.num_changes[namespace] and num_changes > 0:
self.log_info("ACL config for namespace '{}' has not changed for {} seconds. Applying updates ..."
.format(namespace, self.UPDATE_DELAY_SECS))
self.update_control_plane_acls(namespace, new_config_db_connector)
else:
self.log_error("Error updating ACLs for namespace '{}'".format(namespace))
# Re-initialize # Re-initialize
self.num_changes[namespace] = 0 self.num_changes[namespace] = 0
self.update_thread[namespace] = None self.update_thread[namespace] = None
return return
finally:
new_config_db_connector.close("CONFIG_DB")
def allow_bfd_protocol(self, namespace): def allow_bfd_protocol(self, namespace):
iptables_cmds = [] iptables_cmds = []
@ -797,7 +803,7 @@ class ControlPlaneAclManager(daemon_base.DaemonBase):
# Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE) # Loop through all asic namespaces (if present) and host namespace (DEFAULT_NAMESPACE)
for namespace in list(self.config_db_map.keys()): for namespace in list(self.config_db_map.keys()):
# Unconditionally update control plane ACLs once at start on given namespace # Unconditionally update control plane ACLs once at start on given namespace
self.update_control_plane_acls(namespace) self.update_control_plane_acls(namespace, self.config_db_map[namespace])
# Connect to Config DB of given namespace # Connect to Config DB of given namespace
acl_db_connector = swsscommon.DBConnector("CONFIG_DB", 0, False, namespace) acl_db_connector = swsscommon.DBConnector("CONFIG_DB", 0, False, namespace)
# Subscribe to notifications when ACL tables changes # Subscribe to notifications when ACL tables changes

View File

@ -40,7 +40,7 @@ class TestCaclmgrdExternalClientAcl(TestCase):
self.caclmgrd.ControlPlaneAclManager.get_chain_list = mock.MagicMock(return_value=["INPUT", "FORWARD", "OUTPUT"]) self.caclmgrd.ControlPlaneAclManager.get_chain_list = mock.MagicMock(return_value=["INPUT", "FORWARD", "OUTPUT"])
caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd") caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd")
iptables_rules_ret, _ = caclmgrd_daemon.get_acl_rules_and_translate_to_iptables_commands('') iptables_rules_ret, _ = caclmgrd_daemon.get_acl_rules_and_translate_to_iptables_commands('', MockConfigDb())
self.assertEqual(set(test_data["return"]).issubset(set(iptables_rules_ret)), True) self.assertEqual(set(test_data["return"]).issubset(set(iptables_rules_ret)), True)
caclmgrd_daemon.iptables_cmd_ns_prefix['asic0'] = 'ip netns exec asic0' caclmgrd_daemon.iptables_cmd_ns_prefix['asic0'] = 'ip netns exec asic0'
caclmgrd_daemon.namespace_docker_mgmt_ip['asic0'] = '1.1.1.1' caclmgrd_daemon.namespace_docker_mgmt_ip['asic0'] = '1.1.1.1'

View File

@ -0,0 +1,51 @@
import os
import sys
import swsscommon
from parameterized import parameterized
from sonic_py_common.general import load_module_from_source
from unittest import TestCase, mock
from pyfakefs.fake_filesystem_unittest import patchfs
from .test_scale_vectors import CACLMGRD_SCALE_TEST_VECTOR
from tests.common.mock_configdb import MockConfigDb
DBCONFIG_PATH = '/var/run/redis/sonic-db/database_config.json'
class TestCaclmgrdScale(TestCase):
"""
Test caclmgrd with scale cacl rules
"""
def setUp(self):
swsscommon.swsscommon.ConfigDBConnector = MockConfigDb
test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
sys.path.insert(0, modules_path)
caclmgrd_path = os.path.join(scripts_path, 'caclmgrd')
self.caclmgrd = load_module_from_source('caclmgrd', caclmgrd_path)
@parameterized.expand(CACLMGRD_SCALE_TEST_VECTOR)
@patchfs
def test_caclmgrd_scale(self, test_name, test_data, fs):
if not os.path.exists(DBCONFIG_PATH):
fs.create_file(DBCONFIG_PATH) # fake database_config.json
MockConfigDb.set_config_db(test_data["config_db"])
with mock.patch("caclmgrd.subprocess") as mocked_subprocess:
popen_mock = mock.Mock()
popen_attrs = test_data["popen_attributes"]
popen_mock.configure_mock(**popen_attrs)
mocked_subprocess.Popen.return_value = popen_mock
mocked_subprocess.PIPE = -1
call_rc = test_data["call_rc"]
mocked_subprocess.call.return_value = call_rc
caclmgrd_daemon = self.caclmgrd.ControlPlaneAclManager("caclmgrd")
caclmgrd_daemon.num_changes[''] = 150
caclmgrd_daemon.check_and_update_control_plane_acls('', 150)
mocked_subprocess.Popen.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True)

File diff suppressed because it is too large Load Diff

View File

@ -27,6 +27,9 @@ class MockConfigDb(object):
def connect(self, wait_for_init=True, retry_on=True): def connect(self, wait_for_init=True, retry_on=True):
pass pass
def close(self, db_name):
pass
def get(self, db_id, key, field): def get(self, db_id, key, field):
return MockConfigDb.CONFIG_DB[key][field] return MockConfigDb.CONFIG_DB[key][field]