[hostcfgd] [202106] Fixed the brief blackout in hostcfgd using SubscriberStateTable (#9031)

#### Why I did it
Ported https://github.com/Azure/sonic-buildimage/pull/8861 to 202106 as it couldn't be cherry-pick directly
This commit is contained in:
Vivek Reddy 2021-10-29 08:55:59 -07:00 committed by GitHub
parent b125f5d564
commit 91628135e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 656 additions and 265 deletions

View File

@ -4,12 +4,15 @@ import ast
import copy
import ipaddress
import os
import sys
import subprocess
import syslog
import signal
import jinja2
from sonic_py_common import device_info
from swsscommon.swsscommon import ConfigDBConnector
from swsscommon.swsscommon import SubscriberStateTable, DBConnector, Select
from swsscommon.swsscommon import ConfigDBConnector, TableConsumable
# FILE
PAM_AUTH_CONF = "/etc/pam.d/common-auth-sonic"
@ -36,6 +39,33 @@ RADIUS_SERVER_TIMEOUT_DEFAULT = "5"
RADIUS_SERVER_AUTH_TYPE_DEFAULT = "pap"
RADIUS_PAM_AUTH_CONF_DIR = "/etc/pam_radius_auth.d/"
# MISC Constants
CFG_DB = "CONFIG_DB"
HOSTCFGD_MAX_PRI = 10 # Used to enforce ordering b/w daemons under Hostcfgd
DEFAULT_SELECT_TIMEOUT = 1000
def safe_eval(val, default_value=False):
""" Safely evaluate the expression, without raising an exception """
try:
ret = ast.literal_eval(val)
except ValueError:
ret = default_value
return ret
def signal_handler(sig, frame):
if sig == signal.SIGHUP:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGHUP' is caught and ignoring..")
elif sig == signal.SIGINT:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGINT' is caught and exiting...")
sys.exit(128 + sig)
elif sig == signal.SIGTERM:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGTERM' is caught and exiting...")
sys.exit(128 + sig)
else:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: invalid signal - ignoring..")
def run_cmd(cmd, log_err=True, raise_exception=False):
try:
@ -86,9 +116,9 @@ class Feature(object):
self.name = feature_name
self.state = self._get_target_state(feature_cfg.get('state'), device_config or {})
self.auto_restart = feature_cfg.get('auto_restart', 'disabled')
self.has_timer = ast.literal_eval(feature_cfg.get('has_timer', 'False'))
self.has_global_scope = ast.literal_eval(feature_cfg.get('has_global_scope', 'True'))
self.has_per_asic_scope = ast.literal_eval(feature_cfg.get('has_per_asic_scope', 'False'))
self.has_timer = safe_eval(feature_cfg.get('has_timer', 'False'))
self.has_global_scope = safe_eval(feature_cfg.get('has_global_scope', 'True'))
self.has_per_asic_scope = safe_eval(feature_cfg.get('has_per_asic_scope', 'False'))
def _get_target_state(self, state_configuration, device_config):
""" Returns the target state for the feature by rendering the state field as J2 template.
@ -109,6 +139,14 @@ class Feature(object):
raise ValueError('Invalid state rendered for feature {}: {}'.format(self.name, target_state))
return target_state
def compare_state(self, feature_name, feature_cfg):
if self.name != feature_name or not isinstance(feature_cfg, dict):
return False
if self.state != feature_cfg.get('state', ''):
return False
return True
class FeatureHandler(object):
""" Handles FEATURE table updates. """
@ -122,7 +160,7 @@ class FeatureHandler(object):
self._cached_config = {}
self.is_multi_npu = device_info.is_multi_npu()
def handle(self, feature_name, feature_cfg):
def handle(self, feature_name, op, feature_cfg):
if not feature_cfg:
self._cached_config.pop(feature_name)
syslog.syslog(syslog.LOG_INFO, "Deregistering feature {}".format(feature_name))
@ -136,9 +174,7 @@ class FeatureHandler(object):
# the next called self.update_feature_state will start it again. If it will fail
# again the auto restart will kick-in. Another order may leave it in failed state
# and not auto restart.
if self._cached_config[feature_name].auto_restart != feature.auto_restart:
self.update_feature_auto_restart(feature)
self._cached_config[feature_name].auto_restart = feature.auto_restart
self.update_feature_auto_restart(feature, feature_name)
# Enable/disable the container service if the feature state was changed from its previous state.
if self._cached_config[feature_name].state != feature.state:
@ -147,7 +183,12 @@ class FeatureHandler(object):
else:
self.resync_feature_state(self._cached_config[feature_name])
def update_all_features_config(self):
def sync_state_field(self):
"""
Summary:
Updates the state field in the FEATURE|* tables as the state field
might have to be rendered based on DEVICE_METADATA table
"""
feature_table = self._config_db.get_table('FEATURE')
for feature_name in feature_table.keys():
if not feature_name:
@ -155,12 +196,8 @@ class FeatureHandler(object):
continue
feature = Feature(feature_name, feature_table[feature_name], self._device_config)
self._cached_config.setdefault(feature_name, feature)
self.update_feature_auto_restart(feature)
self.update_feature_state(feature)
self.resync_feature_state(feature)
if not feature.compare_state(feature_name, feature_table.get(feature_name, {})):
self.resync_feature_state(feature)
def update_feature_state(self, feature):
cached_feature = self._cached_config[feature.name]
@ -203,16 +240,33 @@ class FeatureHandler(object):
return True
def update_feature_auto_restart(self, feature):
def update_feature_auto_restart(self, feature, feature_name):
dir_name = self.SYSTEMD_SERVICE_CONF_DIR.format(feature_name)
auto_restart_conf = os.path.join(dir_name, 'auto_restart.conf')
write_conf = False
if not os.path.exists(auto_restart_conf): # if the auto_restart_conf file is not found, set it
write_conf = True
if self._cached_config[feature_name].auto_restart != feature.auto_restart:
write_conf = True
if not write_conf:
return
self._cached_config[feature_name].auto_restart = feature.auto_restart # Update Cache
restart_config = "always" if feature.auto_restart == "enabled" else "no"
service_conf = "[Service]\nRestart={}\n".format(restart_config)
feature_names, feature_suffixes = self.get_feature_attribute(feature)
for feature_name in feature_names:
dir_name = self.SYSTEMD_SERVICE_CONF_DIR.format(feature_name)
for name in feature_names:
dir_name = self.SYSTEMD_SERVICE_CONF_DIR.format(name)
auto_restart_conf = os.path.join(dir_name, 'auto_restart.conf')
if not os.path.exists(dir_name):
os.mkdir(dir_name)
with open(os.path.join(dir_name, 'auto_restart.conf'), 'w') as cfgfile:
with open(auto_restart_conf, 'w') as cfgfile:
cfgfile.write(service_conf)
try:
@ -250,7 +304,6 @@ class FeatureHandler(object):
props = dict([line.split("=") for line in stdout.decode().strip().splitlines()])
return props["UnitFileState"]
def enable_feature(self, feature):
cmds = []
feature_names, feature_suffixes = self.get_feature_attribute(feature)
@ -321,10 +374,6 @@ class Iptables(object):
'''
return (isinstance(key, tuple))
def load(self, lpbk_table):
for row in lpbk_table:
self.iptables_handler(row, lpbk_table[row])
def command(self, chain, ip, ver, op):
cmd = 'iptables' if ver == '4' else 'ip6tables'
cmd += ' -t mangle --{} {} -p tcp --tcp-flags SYN SYN'.format(op, chain)
@ -375,11 +424,8 @@ class Iptables(object):
for cmd in iptables_cmds:
syslog.syslog(syslog.LOG_INFO, "Running cmd - {}".format(cmd))
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as err:
syslog.syslog(syslog.LOG_ERR, "'{}' failed. RC: {}, output: {}"
.format(err.cmd, err.returncode, err.output))
run_cmd(cmd)
class AaaCfg(object):
def __init__(self):
@ -719,6 +765,7 @@ class AaaCfg(object):
"{} - failed: return code - {}, output:\n{}"
.format(err.cmd, err.returncode, err.output))
class KdumpCfg(object):
def __init__(self, CfgDb):
self.config_db = CfgDb
@ -727,19 +774,17 @@ class KdumpCfg(object):
"num_dumps": "3" }
def load(self, kdump_table):
syslog.syslog(syslog.LOG_INFO, "KdumpCfg load ...")
data = {}
"""
Set the KDUMP table in CFG DB to kdump_defaults if not set by the user
"""
syslog.syslog(syslog.LOG_INFO, "KdumpCfg init ...")
kdump_conf = kdump_table.get("config", {})
for row in self.kdump_defaults:
value = self.kdump_defaults.get(row)
if kdump_conf.get(row) is not None:
value = kdump_conf.get(row)
else:
self.config_db.mod_entry("KDUMP", "config", { row : value})
data[row] = value
self.kdump_update("config", data, True)
if not kdump_conf.get(row):
self.config_db.mod_entry("KDUMP", "config", {row : value})
def kdump_update(self, key, data, isLoad):
def kdump_update(self, key, data):
syslog.syslog(syslog.LOG_INFO, "Kdump global configuration update")
if key == "config":
# Admin mode
@ -759,136 +804,124 @@ class KdumpCfg(object):
memory = self.kdump_defaults["memory"]
if data.get("memory") is not None:
memory = data.get("memory")
if isLoad or data.get("memory") is not None:
if data.get("memory") is not None:
run_cmd("sonic-kdump-config --memory " + memory)
# Num dumps
num_dumps = self.kdump_defaults["num_dumps"]
if data.get("num_dumps") is not None:
num_dumps = data.get("num_dumps")
if isLoad or data.get("num_dumps") is not None:
if data.get("num_dumps") is not None:
run_cmd("sonic-kdump-config --num_dumps " + num_dumps)
class NtpCfg(object):
def __init__(self, CfgDb):
self.config_db = CfgDb
"""
NtpCfg Config Daemon
1) ntp-config.service handles the configuration updates and then starts ntp.service
2) Both of them start after all the feature services start
3) Purpose of this daemon is to propagate runtime config changes in
NTP, NTP_SERVER and LOOPBACK_INTERFACE
"""
def __init__(self):
self.ntp_global = {}
self.has_ntp_servers = False
self.ntp_servers = set()
def load(self, ntp_global_conf, ntp_server_conf):
syslog.syslog(syslog.LOG_INFO, "NtpCfg load ...")
for row in ntp_global_conf:
self.ntp_global_update(row, ntp_global_conf[row], True)
self.ntp_server_update(0, ntp_server_conf, True)
def handle_ntp_source_intf_chg (self, key):
def handle_ntp_source_intf_chg(self, intf_name):
# if no ntp server configured, do nothing
if self.has_ntp_servers == False:
if not self.ntp_servers:
return
# check only the intf configured as source interface
if (len(self.ntp_global) == 0):
return
if 'src_intf' not in self.ntp_global:
return
if key[0] != self.ntp_global['src_intf']:
if intf_name not in self.ntp_global.get('src_intf', '').split(';'):
return
else:
# just restart ntp config
cmd = 'systemctl restart ntp-config'
run_cmd(cmd)
def ntp_global_update(self, key, data, isLoad):
syslog.syslog(syslog.LOG_INFO, "ntp global configuration update")
new_src = new_vrf = orig_src = orig_vrf = ""
if 'src_intf' in data:
new_src = data['src_intf']
if 'vrf' in data:
new_vrf = data['vrf']
if (len(self.ntp_global) != 0):
if 'src_intf' in self.ntp_global:
orig_src = self.ntp_global['src_intf']
if 'vrf' in self.ntp_global:
orig_vrf = self.ntp_global['vrf']
def ntp_global_update(self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP GLOBAL Update')
orig_src = self.ntp_global.get('src_intf', '')
orig_src_set = set(orig_src.split(";"))
orig_vrf = self.ntp_global.get('vrf', '')
new_src = data.get('src_intf', '')
new_src_set = set(new_src.split(";"))
new_vrf = data.get('vrf', '')
# Update the Local Cache
self.ntp_global = data
# during initial load of ntp configuration, ntp server configuration decides if to restart ntp-config
if (isLoad):
syslog.syslog(syslog.LOG_INFO, "ntp global update in load")
return
# check if ntp server configured, if not, do nothing
if self.has_ntp_servers == False:
syslog.syslog(syslog.LOG_INFO, "no ntp server when global config change, do nothing")
return
if not self.ntp_servers:
syslog.syslog(syslog.LOG_INFO, "No ntp server when global config change, do nothing")
return
if (new_src != orig_src):
if orig_src_set != new_src_set:
syslog.syslog(syslog.LOG_INFO, "ntp global update for source intf old {} new {}, restarting ntp-config"
.format(orig_src, new_src))
.format(orig_src_set, new_src_set))
cmd = 'systemctl restart ntp-config'
run_cmd(cmd)
else:
if (new_vrf != orig_vrf):
syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service"
.format(orig_vrf, new_vrf))
cmd = 'service ntp restart'
run_cmd(cmd)
elif new_vrf != orig_vrf:
syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service"
.format(orig_vrf, new_vrf))
cmd = 'service ntp restart'
run_cmd(cmd)
def ntp_server_update(self, key, data, isLoad):
syslog.syslog(syslog.LOG_INFO, 'ntp server update key {} data {}'.format(key, data))
def ntp_server_update(self, key, op):
syslog.syslog(syslog.LOG_INFO, 'ntp server update key {}'.format(key))
# during load, restart ntp-config regardless if ntp server is configured or not
if isLoad == True:
if data != {}:
self.has_ntp_servers = True
else:
# for runtime ntp server change, to determine if there is ntp server configured, need to
# get from configDB, as delete triggers 2 event handling
ntp_servers_tbl = self.config_db.get_table('NTP_SERVER')
if ntp_servers_tbl != {}:
self.has_ntp_servers = True
else:
self.has_ntp_servers = False
cmd = 'systemctl restart ntp-config'
syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp server exists {}'.format(self.has_ntp_servers))
run_cmd(cmd)
restart_config = False
if op == "SET" and key not in self.ntp_servers:
restart_config = True
self.ntp_servers.add(key)
elif op == "DEL" and key in self.ntp_servers:
restart_config = True
self.ntp_servers.remove(key)
if restart_config:
cmd = 'systemctl restart ntp-config'
syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp servers configured {}'.format(self.ntp_servers))
run_cmd(cmd)
class HostConfigDaemon:
def __init__(self):
# Just a sanity check to verify if the CONFIG_DB has been initialized
# before moving forward
self.config_db = ConfigDBConnector()
self.config_db.connect(wait_for_init=True, retry_on=True)
self.dbconn = DBConnector(CFG_DB, 0)
self.selector = Select()
syslog.syslog(syslog.LOG_INFO, 'ConfigDB connect success')
self.select = Select()
self.callbacks = dict()
self.subscriber_map = dict()
# Load DEVICE metadata configurations
self.device_config = {}
self.device_config['DEVICE_METADATA'] = self.config_db.get_table('DEVICE_METADATA')
self.hostname_cache=""
self.aaacfg = AaaCfg()
self.iptables = Iptables()
self.feature_handler = FeatureHandler(self.config_db, self.device_config)
self.ntpcfg = NtpCfg(self.config_db)
self.is_multi_npu = device_info.is_multi_npu()
# Load Kdump configuration
# Initialize KDump Config and set the config to default if nothing is provided
self.kdumpCfg = KdumpCfg(self.config_db)
self.kdumpCfg.load(self.config_db.get_table('KDUMP'))
# Initialize IpTables
self.iptables = Iptables()
# Intialize Feature Handler
self.feature_handler = FeatureHandler(self.config_db, self.device_config)
self.feature_handler.sync_state_field()
# Initialize Ntp Config Handler
self.ntpcfg = NtpCfg()
self.is_multi_npu = device_info.is_multi_npu()
# Initialize AAACfg
self.hostname_cache=""
self.aaacfg = AaaCfg()
def load(self):
aaa = self.config_db.get_table('AAA')
@ -898,14 +931,6 @@ class HostConfigDaemon:
radius_server = self.config_db.get_table('RADIUS_SERVER')
self.aaacfg.load(aaa, tacacs_global, tacacs_server, radius_global, radius_server)
lpbk_table = self.config_db.get_table('LOOPBACK_INTERFACE')
self.iptables.load(lpbk_table)
# Load NTP configurations
ntp_server = self.config_db.get_table('NTP_SERVER')
ntp_global = self.config_db.get_table('NTP')
self.ntpcfg.load(ntp_global, ntp_server)
try:
dev_meta = self.config_db.get_table('DEVICE_METADATA')
if 'localhost' in dev_meta:
@ -913,130 +938,171 @@ class HostConfigDaemon:
self.hostname_cache = dev_meta['localhost']['hostname']
except Exception as e:
pass
# Update AAA with the hostname
self.aaacfg.hostname_update(self.hostname_cache)
def aaa_handler(self, key, data):
self.aaacfg.aaa_update(key, data)
def __get_intf_name(self, key):
if isinstance(key, tuple) and key:
intf = key[0]
else:
intf = key
return intf
def tacacs_server_handler(self, key, data):
def aaa_handler(self, key, op, data):
self.aaacfg.aaa_update(key, data)
syslog.syslog(syslog.LOG_INFO, 'AAA Update: key: {}, op: {}, data: {}'.format(key, op, data))
def tacacs_server_handler(self, key, op, data):
self.aaacfg.tacacs_server_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'TACPLUS_SERVER update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def tacacs_global_handler(self, key, data):
def tacacs_global_handler(self, key, op, data):
self.aaacfg.tacacs_global_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'TACPLUS Global update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def radius_server_handler(self, key, data):
def radius_server_handler(self, key, op, data):
self.aaacfg.radius_server_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'RADIUS_SERVER update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def radius_global_handler(self, key, data):
def radius_global_handler(self, key, op, data):
self.aaacfg.radius_global_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'RADIUS Global update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def mgmt_intf_handler(self, key, data):
self.aaacfg.handle_radius_source_intf_ip_chg(key)
self.aaacfg.handle_radius_nas_ip_chg(key)
def lpbk_handler(self, key, data):
def mgmt_intf_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
# Check if delete operation by fetch existing keys
keys = self.config_db.get_keys('LOOPBACK_INTERFACE')
if key in keys:
add = True
else:
mgmt_intf_name = self.__get_intf_name(key)
self.aaacfg.handle_radius_source_intf_ip_chg(mgmt_intf_name)
self.aaacfg.handle_radius_nas_ip_chg(mgmt_intf_name)
def lpbk_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
if op == "DEL":
add = False
else:
add = True
self.iptables.iptables_handler(key, data, add)
self.ntpcfg.handle_ntp_source_intf_chg(key)
lpbk_name = self.__get_intf_name(key)
self.ntpcfg.handle_ntp_source_intf_chg(lpbk_name)
self.aaacfg.handle_radius_source_intf_ip_chg(key)
def vlan_intf_handler(self, key, data):
def vlan_intf_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
self.aaacfg.handle_radius_source_intf_ip_chg(key)
def vlan_sub_intf_handler(self, key, data):
def vlan_sub_intf_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
self.aaacfg.handle_radius_source_intf_ip_chg(key)
def portchannel_intf_handler(self, key, data):
def portchannel_intf_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
self.aaacfg.handle_radius_source_intf_ip_chg(key)
def phy_intf_handler(self, key, data):
def phy_intf_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
self.aaacfg.handle_radius_source_intf_ip_chg(key)
def ntp_server_handler (self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP server handler...')
ntp_server_db = self.config_db.get_table('NTP_SERVER')
data = ntp_server_db
self.ntpcfg.ntp_server_update(key, data, False)
def ntp_server_handler(self, key, op, data):
self.ntpcfg.ntp_server_update(key, op)
def ntp_global_handler (self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP global handler...')
self.ntpcfg.ntp_global_update(key, data, False)
def ntp_global_handler(self, key, op, data):
self.ntpcfg.ntp_global_update(key, data)
def kdump_handler (self, key, data):
def kdump_handler (self, key, op, data):
syslog.syslog(syslog.LOG_INFO, 'Kdump handler...')
self.kdumpCfg.kdump_update(key, data, False)
self.kdumpCfg.kdump_update(key, data)
def wait_till_system_init_done(self):
# No need to print the output in the log file so using the "--quiet"
# flag
systemctl_cmd = "sudo systemctl is-system-running --wait --quiet"
subprocess.call(systemctl_cmd, shell=True)
def start(self):
self.config_db.subscribe('AAA', lambda table, key, data: self.aaa_handler(key, data))
self.config_db.subscribe('TACPLUS_SERVER', lambda table, key, data: self.tacacs_server_handler(key, data))
self.config_db.subscribe('TACPLUS', lambda table, key, data: self.tacacs_global_handler(key, data))
self.config_db.subscribe('RADIUS_SERVER', lambda table, key, data: self.radius_server_handler(key, data))
self.config_db.subscribe('RADIUS', lambda table, key, data: self.radius_global_handler(key, data))
self.config_db.subscribe('MGMT_INTERFACE', lambda table, key, data: self.mgmt_intf_handler(key, data))
self.config_db.subscribe('LOOPBACK_INTERFACE', lambda table, key, data: self.lpbk_handler(key, data))
self.config_db.subscribe('FEATURE', lambda table, key, data: self.feature_handler.handle(key, data))
self.config_db.subscribe('VLAN_INTERFACE', lambda table, key, data: self.vlan_intf_handler(key, data))
self.config_db.subscribe('VLAN_SUB_INTERFACE', lambda table, key, data: self.vlan_sub_intf_handler(key, data))
self.config_db.subscribe('PORTCHANNEL_INTERFACE', lambda table, key, data: self.portchannel_intf_handler(key, data))
self.config_db.subscribe('INTERFACE', lambda table, key, data: self.phy_intf_handler(key, data))
self.config_db.subscribe('NTP_SERVER', lambda table, key, data: self.ntp_server_handler(key, data))
self.config_db.subscribe('NTP', lambda table, key, data: self.ntp_global_handler(key, data))
self.config_db.subscribe('KDUMP', lambda table, key, data: self.kdump_handler(key, data))
def subscribe(self, table, callback, pri):
try:
if table not in self.callbacks:
self.callbacks[table] = []
subscriber = SubscriberStateTable(self.dbconn, table, TableConsumable.DEFAULT_POP_BATCH_SIZE, pri)
self.selector.addSelectable(subscriber) # Add to the Selector
self.subscriber_map[subscriber.getFd()] = (subscriber, table) # Maintain a mapping b/w subscriber & fd
self.callbacks[table].append(callback)
except Exception as err:
syslog.syslog(syslog.LOG_ERR, "Subscribe to table {} failed with error {}".format(table, err))
def register_callbacks(self):
self.subscribe('KDUMP', lambda table, key, op, data: self.kdump_handler(key, op, data), HOSTCFGD_MAX_PRI)
# Handle FEATURE updates before other tables
self.subscribe('FEATURE', lambda table, key, op, data: self.feature_handler.handle(key, op, data), HOSTCFGD_MAX_PRI-1)
# Handle AAA, TACACS and RADIUS related tables
self.subscribe('AAA', lambda table, key, op, data: self.aaa_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('TACPLUS', lambda table, key, op, data: self.tacacs_global_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('TACPLUS_SERVER', lambda table, key, op, data: self.tacacs_server_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('RADIUS', lambda table, key, op, data: self.radius_global_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('RADIUS_SERVER', lambda table, key, op, data: self.radius_server_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
# Handle IPTables configuration
self.subscribe('LOOPBACK_INTERFACE', lambda table, key, op, data: self.lpbk_handler(key, op, data), HOSTCFGD_MAX_PRI-3)
# Handle NTP & NTP_SERVER updates
self.subscribe('NTP', lambda table, key, op, data: self.ntp_global_handler(key, op, data), HOSTCFGD_MAX_PRI-4)
self.subscribe('NTP_SERVER', lambda table, key, op, data: self.ntp_server_handler(key, op, data), HOSTCFGD_MAX_PRI-4)
# Handle updates to src intf changes in radius
self.subscribe('MGMT_INTERFACE', lambda table, key, op, data: self.mgmt_intf_handler(key, op, data), HOSTCFGD_MAX_PRI-5)
self.subscribe('VLAN_INTERFACE', lambda table, key, op, data: self.vlan_intf_handler(key, op, data), HOSTCFGD_MAX_PRI-5)
self.subscribe('VLAN_SUB_INTERFACE', lambda table, key, op, data: self.vlan_sub_intf_handler(key, op, data), HOSTCFGD_MAX_PRI-5)
self.subscribe('PORTCHANNEL_INTERFACE', lambda table, key, op, data: self.portchannel_intf_handler(key, op, data), HOSTCFGD_MAX_PRI-5)
self.subscribe('INTERFACE', lambda table, key, op, data: self.phy_intf_handler(key, op, data), HOSTCFGD_MAX_PRI-5)
syslog.syslog(syslog.LOG_INFO,
"Waiting for systemctl to finish initialization")
self.wait_till_system_init_done()
syslog.syslog(syslog.LOG_INFO,
"systemctl has finished initialization -- proceeding ...")
# Update all feature states once upon starting
self.feature_handler.update_all_features_config()
def start(self):
while True:
state, selectable_ = self.selector.select(DEFAULT_SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
syslog.syslog(syslog.LOG_ERR,
"error returned by select")
continue
# Defer load until subscribe
self.load()
self.config_db.listen()
fd = selectable_.getFd()
# Get the Corresponding subscriber & table
subscriber, table = self.subscriber_map.get(fd, (None, ""))
if not subscriber:
syslog.syslog(syslog.LOG_ERR,
"No Subscriber object found for fd: {}, subscriber map: {}".format(fd, subscriber_map))
continue
key, op, fvs = subscriber.pop()
# Get the registered callback
cbs = self.callbacks.get(table, None)
for callback in cbs:
callback(table, key, op, dict(fvs))
def main():
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
daemon = HostConfigDaemon()
daemon.register_callbacks()
daemon.load()
daemon.start()
if __name__ == "__main__":
main()

View File

@ -26,8 +26,7 @@ setup(
'dbus-python',
'Jinja2>=2.10',
'PyGObject',
'sonic-py-common',
'systemd-python',
'sonic-py-common'
],
setup_requires = [
'pytest-runner',
@ -37,7 +36,8 @@ setup(
'parameterized',
'pytest',
'pyfakefs',
'sonic-py-common'
'sonic-py-common',
'deepdiff'
],
classifiers = [
'Development Status :: 3 - Alpha',

View File

@ -0,0 +1,118 @@
class MockConfigDb(object):
"""
Mock Config DB which responds to data tables requests and store updates to the data table
"""
STATE_DB = None
CONFIG_DB = None
def __init__(self, **kwargs):
pass
@staticmethod
def set_config_db(test_config_db):
MockConfigDb.CONFIG_DB = test_config_db
@staticmethod
def deserialize_key(key, separator="|"):
tokens = key.split(separator)
if len(tokens) > 1:
return tuple(tokens)
else:
return key
@staticmethod
def get_config_db():
return MockConfigDb.CONFIG_DB
def connect(self, wait_for_init=True, retry_on=True):
pass
def get(self, db_id, key, field):
return MockConfigDb.CONFIG_DB[key][field]
def get_entry(self, key, field):
return MockConfigDb.CONFIG_DB[key][field]
def mod_entry(self, key, field, data):
existing_data = self.get_entry(key, field)
existing_data.update(data)
self.set_entry(key, field, existing_data)
def set_entry(self, key, field, data):
MockConfigDb.CONFIG_DB[key][field] = data
def get_table(self, table_name):
return MockConfigDb.CONFIG_DB[table_name]
class MockSelect():
event_queue = []
@staticmethod
def set_event_queue(Q):
MockSelect.event_queue = Q
@staticmethod
def get_event_queue():
return MockSelect.event_queue
@staticmethod
def reset_event_queue():
MockSelect.event_queue = []
def __init__(self):
self.sub_map = {}
self.TIMEOUT = "TIMEOUT"
self.ERROR = "ERROR"
def addSelectable(self, subscriber):
self.sub_map[subscriber.table] = subscriber
def select(self, TIMEOUT):
if not MockSelect.get_event_queue():
raise TimeoutError
table, key = MockSelect.get_event_queue().pop(0)
self.sub_map[table].nextKey(key)
return "OBJECT", self.sub_map[table]
class MockSubscriberStateTable():
FD_INIT = 0
@staticmethod
def generate_fd():
curr = MockSubscriberStateTable.FD_INIT
MockSubscriberStateTable.FD_INIT = curr + 1
return curr
@staticmethod
def reset_fd():
MockSubscriberStateTable.FD_INIT = 0
def __init__(self, conn, table, pop, pri):
self.fd = MockSubscriberStateTable.generate_fd()
self.next_key = ''
self.table = table
def getFd(self):
return self.fd
def nextKey(self, key):
self.next_key = key
def pop(self):
table = MockConfigDb.CONFIG_DB.get(self.table, {})
if self.next_key not in table:
op = "DEL"
fvs = {}
else:
op = "SET"
fvs = table.get(self.next_key, {})
return self.next_key, op, fvs
class MockDBConnector():
def __init__(self, db, val):
pass

View File

@ -10,10 +10,10 @@ from swsscommon import swsscommon
from parameterized import parameterized
from unittest import TestCase, mock
from tests.hostcfgd.test_radius_vectors import HOSTCFGD_TEST_RADIUS_VECTOR
from tests.hostcfgd.mock_configdb import MockConfigDb
from tests.common.mock_configdb import MockConfigDb, MockSubscriberStateTable
from tests.common.mock_configdb import MockSelect, MockDBConnector
swsscommon.ConfigDBConnector = MockConfigDb
test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
@ -31,6 +31,12 @@ hostcfgd = importlib.util.module_from_spec(spec)
loader.exec_module(hostcfgd)
sys.modules['hostcfgd'] = hostcfgd
# Mock swsscommon classes
hostcfgd.ConfigDBConnector = MockConfigDb
hostcfgd.SubscriberStateTable = MockSubscriberStateTable
hostcfgd.Select = MockSelect
hostcfgd.DBConnector = MockDBConnector
class TestHostcfgdRADIUS(TestCase):
"""
@ -44,11 +50,9 @@ class TestHostcfgdRADIUS(TestCase):
def test_hostcfgd_radius(self, test_name, test_data):
"""
Test RADIUS hostcfd daemon initialization
Args:
test_name(str): test name
test_data(dict): test data which contains initial Config Db tables, and expected results
Returns:
None
"""

View File

@ -1,26 +1,32 @@
import os
import sys
import swsscommon
import swsscommon as swsscommon_package
from swsscommon import swsscommon
from parameterized import parameterized
from sonic_py_common.general import load_module_from_source
from unittest import TestCase, mock
from .test_vectors import HOSTCFGD_TEST_VECTOR
from .mock_configdb import MockConfigDb
from .test_vectors import HOSTCFGD_TEST_VECTOR, HOSTCFG_DAEMON_CFG_DB
from tests.common.mock_configdb import MockConfigDb, MockSubscriberStateTable
from tests.common.mock_configdb import MockSelect, MockDBConnector
from pyfakefs.fake_filesystem_unittest import patchfs
from deepdiff import DeepDiff
from unittest.mock import call
swsscommon.swsscommon.ConfigDBConnector = MockConfigDb
test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
scripts_path = os.path.join(modules_path, 'scripts')
sys.path.insert(0, modules_path)
# Load the file under test
hostcfgd_path = os.path.join(scripts_path, 'hostcfgd')
hostcfgd = load_module_from_source('hostcfgd', hostcfgd_path)
hostcfgd.ConfigDBConnector = MockConfigDb
hostcfgd.SubscriberStateTable = MockSubscriberStateTable
hostcfgd.Select = MockSelect
hostcfgd.DBConnector = MockDBConnector
class TestHostcfgd(TestCase):
@ -40,18 +46,36 @@ class TestHostcfgd(TestCase):
Returns:
None
"""
is_equal = len(table) == len(expected_table)
if is_equal:
for key, fields in expected_table.items():
is_equal = is_equal and key in table and len(fields) == len(table[key])
if is_equal:
for field, value in fields.items():
is_equal = is_equal and value == table[key][field]
if not is_equal:
break;
else:
break
return is_equal
ddiff = DeepDiff(table, expected_table, ignore_order=True)
print('DIFF:', ddiff)
return True if not ddiff else False
def __verify_fs(self, table):
"""
verify filesystem changes made by hostcfgd.
Checks whether systemd override configuration files
were generated and Restart= for systemd unit is set
correctly
Args:
table(dict): Current Config Db table
Returns: Boolean wether test passed.
"""
exp_dict = {
'enabled': 'always',
'disabled': 'no',
}
auto_restart_conf = os.path.join(hostcfgd.FeatureHandler.SYSTEMD_SERVICE_CONF_DIR, 'auto_restart.conf')
for feature in table:
auto_restart = table[feature].get('auto_restart', 'disabled')
with open(auto_restart_conf.format(feature)) as conf:
conf = conf.read().strip()
assert conf == '[Service]\nRestart={}'.format(exp_dict[auto_restart])
def __verify_fs(self, table):
"""
@ -82,9 +106,9 @@ class TestHostcfgd(TestCase):
@parameterized.expand(HOSTCFGD_TEST_VECTOR)
@patchfs
def test_hostcfgd(self, test_name, test_data, fs):
def test_hostcfgd_feature_handler(self, test_name, test_data, fs):
"""
Test hostcfd daemon initialization
Test feature config capability in the hostcfd
Args:
test_name(str): test name
@ -93,24 +117,34 @@ class TestHostcfgd(TestCase):
Returns:
None
"""
fs.add_real_paths(swsscommon.__path__) # add real path of swsscommon for database_config.json
fs.add_real_paths(swsscommon_package.__path__) # add real path of swsscommon for database_config.json
fs.create_dir(hostcfgd.FeatureHandler.SYSTEMD_SYSTEM_DIR)
MockConfigDb.set_config_db(test_data["config_db"])
with mock.patch("hostcfgd.subprocess") as mocked_subprocess:
MockConfigDb.set_config_db(test_data['config_db'])
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = test_data["popen_attributes"]
attrs = test_data['popen_attributes']
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.feature_handler.update_all_features_config()
assert self.__verify_table(
MockConfigDb.get_config_db()["FEATURE"],
test_data["expected_config_db"]["FEATURE"]
), "Test failed for test data: {0}".format(test_data)
mocked_subprocess.check_call.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True)
# Initialize Feature Handler
device_config = {}
device_config['DEVICE_METADATA'] = MockConfigDb.CONFIG_DB['DEVICE_METADATA']
feature_handler = hostcfgd.FeatureHandler(MockConfigDb(), device_config)
self.__verify_fs(test_data["config_db"]["FEATURE"])
# sync the state field and Handle Feature Updates
feature_handler.sync_state_field()
features = MockConfigDb.CONFIG_DB['FEATURE']
for key, fvs in features.items():
feature_handler.handle(key, 'SET', fvs)
# Verify if the updates are properly updated
assert self.__verify_table(
MockConfigDb.get_config_db()['FEATURE'],
test_data['expected_config_db']['FEATURE']
), 'Test failed for test data: {0}'.format(test_data)
mocked_subprocess.check_call.assert_has_calls(test_data['expected_subprocess_calls'], any_order=True)
self.__verify_fs(test_data['config_db']['FEATURE'])
def test_feature_config_parsing(self):
swss_feature = hostcfgd.Feature('swss', {
@ -139,3 +173,150 @@ class TestHostcfgd(TestCase):
assert not swss_feature.has_timer
assert swss_feature.has_global_scope
assert not swss_feature.has_per_asic_scope
class TesNtpCfgd(TestCase):
"""
Test hostcfd daemon - NtpCfgd
"""
def setUp(self):
MockConfigDb.CONFIG_DB['NTP'] = {'global': {'vrf': 'mgmt', 'src_intf': 'eth0'}}
MockConfigDb.CONFIG_DB['NTP_SERVER'] = {'0.debian.pool.ntp.org': {}}
def tearDown(self):
MockConfigDb.CONFIG_DB = {}
def test_ntp_global_update_with_no_servers(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global'])
mocked_subprocess.check_call.assert_not_called()
def test_ntp_global_update_ntp_servers(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global'])
ntpcfgd.ntp_server_update('0.debian.pool.ntp.org', 'SET')
mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)])
def test_loopback_update(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global = MockConfigDb.CONFIG_DB['NTP']['global']
ntpcfgd.ntp_servers.add('0.debian.pool.ntp.org')
ntpcfgd.handle_ntp_source_intf_chg('eth0')
mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)])
class TestHostcfgdDaemon(TestCase):
def setUp(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
def tearDown(self):
MockConfigDb.CONFIG_DB = {}
@patchfs
def test_feature_events(self, fs):
fs.create_dir(hostcfgd.FeatureHandler.SYSTEMD_SYSTEM_DIR)
MockSelect.event_queue = [('FEATURE', 'dhcp_relay'),
('FEATURE', 'mux'),
('FEATURE', 'telemetry')]
daemon = hostcfgd.HostConfigDaemon()
daemon.register_callbacks()
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sudo systemctl daemon-reload', shell=True),
call('sudo systemctl unmask dhcp_relay.service', shell=True),
call('sudo systemctl enable dhcp_relay.service', shell=True),
call('sudo systemctl start dhcp_relay.service', shell=True),
call('sudo systemctl daemon-reload', shell=True),
call('sudo systemctl unmask mux.service', shell=True),
call('sudo systemctl enable mux.service', shell=True),
call('sudo systemctl start mux.service', shell=True),
call('sudo systemctl daemon-reload', shell=True),
call('sudo systemctl unmask telemetry.service', shell=True),
call('sudo systemctl unmask telemetry.timer', shell=True),
call('sudo systemctl enable telemetry.timer', shell=True),
call('sudo systemctl start telemetry.timer', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected)
# Change the state to disabled
MockConfigDb.CONFIG_DB['FEATURE']['telemetry']['state'] = 'disabled'
MockSelect.event_queue = [('FEATURE', 'telemetry')]
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sudo systemctl stop telemetry.timer', shell=True),
call('sudo systemctl disable telemetry.timer', shell=True),
call('sudo systemctl mask telemetry.timer', shell=True),
call('sudo systemctl stop telemetry.service', shell=True),
call('sudo systemctl disable telemetry.timer', shell=True),
call('sudo systemctl mask telemetry.timer', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected)
def test_loopback_events(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
MockSelect.event_queue = [('NTP', 'global'),
('NTP_SERVER', '0.debian.pool.ntp.org'),
('LOOPBACK_INTERFACE', 'Loopback0|10.184.8.233/32')]
daemon = hostcfgd.HostConfigDaemon()
daemon.register_callbacks()
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('systemctl restart ntp-config', shell=True),
call('iptables -t mangle --append PREROUTING -p tcp --tcp-flags SYN SYN -d 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True),
call('iptables -t mangle --append POSTROUTING -p tcp --tcp-flags SYN SYN -s 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected, any_order=True)
def test_kdump_event(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
daemon = hostcfgd.HostConfigDaemon()
daemon.register_callbacks()
assert MockConfigDb.CONFIG_DB['KDUMP']['config']
MockSelect.event_queue = [('KDUMP', 'config')]
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sonic-kdump-config --disable', shell=True),
call('sonic-kdump-config --num_dumps 3', shell=True),
call('sonic-kdump-config --memory 0M-2G:256M,2G-4G:320M,4G-8G:384M,8G-:448M', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected, any_order=True)

View File

@ -1,37 +0,0 @@
class MockConfigDb(object):
"""
Mock Config DB which responds to data tables requests and store updates to the data table
"""
STATE_DB = None
CONFIG_DB = None
def __init__(self):
pass
@staticmethod
def set_config_db(test_config_db):
MockConfigDb.CONFIG_DB = test_config_db
@staticmethod
def get_config_db():
return MockConfigDb.CONFIG_DB
def connect(self, wait_for_init=True, retry_on=True):
pass
def get(self, db_id, key, field):
return MockConfigDb.CONFIG_DB[key][field]
def get_entry(self, key, field):
return MockConfigDb.CONFIG_DB[key][field]
def mod_entry(self, key, field, data):
existing_data = self.get_entry(key, field)
existing_data.update(data)
self.set_entry(key, field, existing_data)
def set_entry(self, key, field, data):
MockConfigDb.CONFIG_DB[key][field] = data
def get_table(self, table_name):
return MockConfigDb.CONFIG_DB[table_name]

View File

@ -492,3 +492,62 @@ HOSTCFGD_TEST_VECTOR = [
}
]
]
HOSTCFG_DAEMON_CFG_DB = {
"FEATURE": {
"dhcp_relay": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "False",
"high_mem_alert": "disabled",
"set_owner": "kube",
"state": "{% if not (DEVICE_METADATA is defined and DEVICE_METADATA['localhost'] is defined and DEVICE_METADATA['localhost']['type'] is defined and DEVICE_METADATA['localhost']['type'] != 'ToRRouter') %}enabled{% else %}disabled{% endif %}"
},
"mux": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "False",
"high_mem_alert": "disabled",
"set_owner": "local",
"state": "{% if 'subtype' in DEVICE_METADATA['localhost'] and DEVICE_METADATA['localhost']['subtype'] == 'DualToR' %}enabled{% else %}always_disabled{% endif %}"
},
"telemetry": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "True",
"high_mem_alert": "disabled",
"set_owner": "kube",
"state": "enabled",
"status": "enabled"
},
},
"KDUMP": {
"config": {
}
},
"NTP": {
"global": {
"vrf": "default",
"src_intf": "eth0;Loopback0"
}
},
"NTP_SERVER": {
"0.debian.pool.ntp.org": {}
},
"LOOPBACK_INTERFACE": {
"Loopback0|10.184.8.233/32": {
"scope": "global",
"family": "IPv4"
}
},
"DEVICE_METADATA": {
"localhost": {
"subtype": "DualToR",
"type": "ToRRouter",
}
}
}