[hostcfgd] [202012] Fixed the brief blackout in hostcfgd using SubscriberStateTable (#9228)

#### Why I did it
Backporting https://github.com/Azure/sonic-buildimage/pull/8861 to 202012
This commit is contained in:
Vivek Reddy 2021-11-22 21:57:07 -08:00 committed by GitHub
parent 7fb0f3f89f
commit edd6b847e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 553 additions and 222 deletions

View File

@ -4,12 +4,15 @@ import ast
import copy
import ipaddress
import os
import sys
import subprocess
import syslog
import signal
import jinja2
from sonic_py_common import device_info
from swsscommon.swsscommon import ConfigDBConnector
from swsscommon.swsscommon import SubscriberStateTable, DBConnector, Select
from swsscommon.swsscommon import ConfigDBConnector, TableConsumable
# FILE
PAM_AUTH_CONF = "/etc/pam.d/common-auth-sonic"
@ -23,6 +26,34 @@ TACPLUS_SERVER_PASSKEY_DEFAULT = ""
TACPLUS_SERVER_TIMEOUT_DEFAULT = "5"
TACPLUS_SERVER_AUTH_TYPE_DEFAULT = "pap"
# MISC Constants
CFG_DB = "CONFIG_DB"
HOSTCFGD_MAX_PRI = 10 # Used to enforce ordering b/w daemons under Hostcfgd
DEFAULT_SELECT_TIMEOUT = 1000
def safe_eval(val, default_value=False):
""" Safely evaluate the expression, without raising an exception """
try:
ret = ast.literal_eval(val)
except ValueError:
ret = default_value
return ret
def signal_handler(sig, frame):
if sig == signal.SIGHUP:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGHUP' is caught and ignoring..")
elif sig == signal.SIGINT:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGINT' is caught and exiting...")
sys.exit(128 + sig)
elif sig == signal.SIGTERM:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: signal 'SIGTERM' is caught and exiting...")
sys.exit(128 + sig)
else:
syslog.syslog(syslog.LOG_INFO, "HostCfgd: invalid signal - ignoring..")
def run_cmd(cmd, log_err = True):
try:
subprocess.check_call(cmd, shell = True)
@ -31,6 +62,7 @@ def run_cmd(cmd, log_err = True):
syslog.syslog(syslog.LOG_ERR, "{} - failed: return code - {}, output:\n{}"
.format(err.cmd, err.returncode, err.output))
def is_true(val):
if val == 'True' or val == 'true':
return True
@ -74,10 +106,6 @@ class Iptables(object):
'''
return (isinstance(key, tuple))
def load(self, lpbk_table):
for row in lpbk_table:
self.iptables_handler(row, lpbk_table[row])
def command(self, chain, ip, ver, op):
cmd = 'iptables' if ver == '4' else 'ip6tables'
cmd += ' -t mangle --{} {} -p tcp --tcp-flags SYN SYN'.format(op, chain)
@ -128,11 +156,8 @@ class Iptables(object):
for cmd in iptables_cmds:
syslog.syslog(syslog.LOG_INFO, "Running cmd - {}".format(cmd))
try:
subprocess.check_call(cmd, shell=True)
except subprocess.CalledProcessError as err:
syslog.syslog(syslog.LOG_ERR, "'{}' failed. RC: {}, output: {}"
.format(err.cmd, err.returncode, err.output))
run_cmd(cmd)
class AaaCfg(object):
def __init__(self):
@ -248,19 +273,17 @@ class KdumpCfg(object):
"num_dumps": "3" }
def load(self, kdump_table):
syslog.syslog(syslog.LOG_INFO, "KdumpCfg load ...")
data = {}
"""
Set the KDUMP table in CFG DB to kdump_defaults if not set by the user
"""
syslog.syslog(syslog.LOG_INFO, "KdumpCfg init...")
kdump_conf = kdump_table.get("config", {})
for row in self.kdump_defaults:
value = self.kdump_defaults.get(row)
if kdump_conf.get(row) is not None:
value = kdump_conf.get(row)
else:
if not kdump_conf.get(row):
self.config_db.mod_entry("KDUMP", "config", { row : value})
data[row] = value
self.kdump_update("config", data, True)
def kdump_update(self, key, data, isLoad):
def kdump_update(self, key, data):
syslog.syslog(syslog.LOG_INFO, "Kdump global configuration update")
if key == "config":
# Admin mode
@ -280,127 +303,109 @@ class KdumpCfg(object):
memory = self.kdump_defaults["memory"]
if data.get("memory") is not None:
memory = data.get("memory")
if isLoad or data.get("memory") is not None:
if data.get("memory") is not None:
run_cmd("sonic-kdump-config --memory " + memory)
# Num dumps
num_dumps = self.kdump_defaults["num_dumps"]
if data.get("num_dumps") is not None:
num_dumps = data.get("num_dumps")
if isLoad or data.get("num_dumps") is not None:
if data.get("num_dumps") is not None:
run_cmd("sonic-kdump-config --num_dumps " + num_dumps)
class NtpCfg(object):
def __init__(self, CfgDb):
self.config_db = CfgDb
"""
NtpCfg Config Daemon
1) ntp-config.service handles the configuration updates and then starts ntp.service
2) Both of them start after all the feature services start
3) Purpose of this daemon is to propagate runtime config changes in
NTP, NTP_SERVER and LOOPBACK_INTERFACE
"""
def __init__(self):
self.ntp_global = {}
self.has_ntp_servers = False
self.ntp_servers = set()
def load(self, ntp_global_conf, ntp_server_conf):
syslog.syslog(syslog.LOG_INFO, "NtpCfg load ...")
for row in ntp_global_conf:
self.ntp_global_update(row, ntp_global_conf[row], True)
self.ntp_server_update(0, ntp_server_conf, True)
def handle_ntp_source_intf_chg (self, key):
def handle_ntp_source_intf_chg(self, intf_name):
# if no ntp server configured, do nothing
if self.has_ntp_servers == False:
if not self.ntp_servers:
return
# check only the intf configured as source interface
if (len(self.ntp_global) == 0):
return
if 'src_intf' not in self.ntp_global:
return
if key[0] != self.ntp_global['src_intf']:
if intf_name not in self.ntp_global.get('src_intf', '').split(';'):
return
else:
# just restart ntp config
cmd = 'systemctl restart ntp-config'
run_cmd(cmd)
def ntp_global_update(self, key, data, isLoad):
syslog.syslog(syslog.LOG_INFO, "ntp global configuration update")
new_src = new_vrf = orig_src = orig_vrf = ""
if 'src_intf' in data:
new_src = data['src_intf']
if 'vrf' in data:
new_vrf = data['vrf']
if (len(self.ntp_global) != 0):
if 'src_intf' in self.ntp_global:
orig_src = self.ntp_global['src_intf']
if 'vrf' in self.ntp_global:
orig_vrf = self.ntp_global['vrf']
def ntp_global_update(self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP GLOBAL Update')
orig_src = self.ntp_global.get('src_intf', '')
orig_src_set = set(orig_src.split(";"))
orig_vrf = self.ntp_global.get('vrf', '')
new_src = data.get('src_intf', '')
new_src_set = set(new_src.split(";"))
new_vrf = data.get('vrf', '')
# Update the Local Cache
self.ntp_global = data
# during initial load of ntp configuration, ntp server configuration decides if to restart ntp-config
if (isLoad):
syslog.syslog(syslog.LOG_INFO, "ntp global update in load")
return
# check if ntp server configured, if not, do nothing
if self.has_ntp_servers == False:
syslog.syslog(syslog.LOG_INFO, "no ntp server when global config change, do nothing")
return
if not self.ntp_servers:
syslog.syslog(syslog.LOG_INFO, "No ntp server when global config change, do nothing")
return
if (new_src != orig_src):
if orig_src_set != new_src_set:
syslog.syslog(syslog.LOG_INFO, "ntp global update for source intf old {} new {}, restarting ntp-config"
.format(orig_src, new_src))
.format(orig_src_set, new_src_set))
cmd = 'systemctl restart ntp-config'
run_cmd(cmd)
else:
if (new_vrf != orig_vrf):
syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service"
.format(orig_vrf, new_vrf))
cmd = 'service ntp restart'
run_cmd(cmd)
elif new_vrf != orig_vrf:
syslog.syslog(syslog.LOG_INFO, "ntp global update for vrf old {} new {}, restarting ntp service"
.format(orig_vrf, new_vrf))
cmd = 'service ntp restart'
run_cmd(cmd)
def ntp_server_update(self, key, data, isLoad):
syslog.syslog(syslog.LOG_INFO, 'ntp server update key {} data {}'.format(key, data))
def ntp_server_update(self, key, op):
syslog.syslog(syslog.LOG_INFO, 'ntp server update key {}'.format(key))
# during load, restart ntp-config regardless if ntp server is configured or not
if isLoad == True:
if data != {}:
self.has_ntp_servers = True
else:
# for runtime ntp server change, to determine if there is ntp server configured, need to
# get from configDB, as delete triggers 2 event handling
ntp_servers_tbl = self.config_db.get_table('NTP_SERVER')
if ntp_servers_tbl != {}:
self.has_ntp_servers = True
else:
self.has_ntp_servers = False
restart_config = False
if op == "SET" and key not in self.ntp_servers:
restart_config = True
self.ntp_servers.add(key)
elif op == "DEL" and key in self.ntp_servers:
restart_config = True
self.ntp_servers.remove(key)
cmd = 'systemctl restart ntp-config'
syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp server exists {}'.format(self.has_ntp_servers))
run_cmd(cmd)
if restart_config:
cmd = 'systemctl restart ntp-config'
syslog.syslog(syslog.LOG_INFO, 'ntp server update, restarting ntp-config, ntp servers configured {}'.format(self.ntp_servers))
run_cmd(cmd)
class HostConfigDaemon:
def __init__(self):
# Just a sanity check to verify if the CONFIG_DB has been initialized
# before moving forward
self.config_db = ConfigDBConnector()
self.config_db.connect(wait_for_init=True, retry_on=True)
self.dbconn = DBConnector(CFG_DB, 0)
self.selector = Select()
syslog.syslog(syslog.LOG_INFO, 'ConfigDB connect success')
self.select = Select()
self.callbacks = dict()
self.subscriber_map = dict()
# Load DEVICE metadata configurations
self.device_config = {}
self.device_config['DEVICE_METADATA'] = self.config_db.get_table('DEVICE_METADATA')
self.aaacfg = AaaCfg()
self.iptables = Iptables()
self.ntpcfg = NtpCfg(self.config_db)
self.ntpcfg = NtpCfg()
# Cache the values of 'state' field in 'FEATURE' table of each container
self.cached_feature_states = {}
@ -410,34 +415,16 @@ class HostConfigDaemon:
self.kdumpCfg = KdumpCfg(self.config_db)
self.kdumpCfg.load(self.config_db.get_table('KDUMP'))
def load(self):
aaa = self.config_db.get_table('AAA')
tacacs_global = self.config_db.get_table('TACPLUS')
tacacs_server = self.config_db.get_table('TACPLUS_SERVER')
self.aaacfg.load(aaa, tacacs_global, tacacs_server)
lpbk_table = self.config_db.get_table('LOOPBACK_INTERFACE')
self.iptables.load(lpbk_table)
# Load NTP configurations
ntp_server = self.config_db.get_table('NTP_SERVER')
ntp_global = self.config_db.get_table('NTP')
self.ntpcfg.load(ntp_global, ntp_server)
def get_target_state(self, feature_name, state):
template = jinja2.Template(state)
target_state = template.render(self.device_config)
entry = self.config_db.get_entry('FEATURE', feature_name)
entry["state"] = target_state
self.config_db.set_entry("FEATURE", feature_name, entry)
return target_state
def get_feature_attribute(self, feature_name, feature_table):
has_timer = ast.literal_eval(feature_table[feature_name].get('has_timer', 'False'))
has_global_scope = ast.literal_eval(feature_table[feature_name].get('has_global_scope', 'True'))
has_per_asic_scope = ast.literal_eval(feature_table[feature_name].get('has_per_asic_scope', 'False'))
def get_feature_attribute(self, feature_name, feature_cfg):
has_timer = safe_eval(feature_cfg.get('has_timer', 'False'))
has_global_scope = safe_eval(feature_cfg.get('has_global_scope', 'True'))
has_per_asic_scope = safe_eval(feature_cfg.get('has_per_asic_scope', 'False'))
# Create feature name suffix depending feature is running in host or namespace or in both
feature_names = (
@ -514,119 +501,135 @@ class HostConfigDaemon:
syslog.syslog(syslog.LOG_ERR, "Feature '{}' failed to be stopped and disabled".format(feature_name))
return
def is_invariant_feature(self, feature_name, state, feature_table):
invariant_feature = self.cached_feature_states[feature_name] == "always_enabled" or \
self.cached_feature_states[feature_name] == "always_disabled"
if invariant_feature:
invariant_state = self.cached_feature_states[feature_name]
if state != invariant_state:
syslog.syslog(syslog.LOG_INFO, "Feature '{}' service is '{}'"
.format(feature_name, invariant_state))
entry = self.config_db.get_entry('FEATURE', feature_name)
entry['state'] = invariant_state
self.config_db.set_entry('FEATURE', feature_name, entry)
def update_feature_state(self, feature_name, state, feature_cfg):
cached_feature_state = self.cached_feature_states.get(feature_name, None)
enable = False
disable = False
if state == "always_disabled":
feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_table)
self.disable_feature(feature_names, feature_suffixes)
syslog.syslog(syslog.LOG_INFO, "Feature '{}' is stopped and disabled".format(feature_name))
# Allowed transitions:
# None -> always_enabled
# -> always_disabled
# -> enabled
# -> disabled
# always_enabled -> always_disabled
# enabled -> disabled
# disabled -> enabled
if cached_feature_state is None:
enable = state in ("always_enabled", "enabled")
disable = state in ("always_disabled", "disabled")
elif cached_feature_state in ("always_enabled", "always_disabled"):
disable = state == "always_disabled"
enable = state == "always_enabled"
elif cached_feature_state in ("enabled", "disabled"):
enable = state == "enabled"
disable = state == "disabled"
else:
syslog.syslog(syslog.LOG_INFO, "Feature {} service is {}".format(feature_name, cached_feature_state))
return False
return invariant_feature
if not enable and not disable:
syslog.syslog(syslog.LOG_ERR, "Unexpected state value '{}' for feature {}"
.format(state, feature_name))
return False
def update_feature_state(self, feature_name, state, feature_table):
if not self.is_invariant_feature(feature_name, state, feature_table):
self.cached_feature_states[feature_name] = state
feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_cfg)
feature_names, feature_suffixes = self.get_feature_attribute(feature_name, feature_table)
if state == "enabled":
self.enable_feature(feature_names, feature_suffixes)
syslog.syslog(syslog.LOG_INFO, "Feature '{}.{}' is enabled and started"
.format(feature_name, feature_suffixes[-1]))
elif state == "disabled":
self.disable_feature(feature_names, feature_suffixes)
syslog.syslog(syslog.LOG_INFO, "Feature '{}' is stopped and disabled".format(feature_name))
else:
syslog.syslog(syslog.LOG_ERR, "Unexpected state value '{}' for feature '{}'"
.format(state, feature_name))
if enable:
self.enable_feature(feature_names, feature_suffixes)
syslog.syslog(syslog.LOG_INFO, "Feature {} is enabled and started".format(feature_name))
def update_all_feature_states(self):
if disable:
self.disable_feature(feature_names, feature_suffixes)
syslog.syslog(syslog.LOG_INFO, "Feature {} is stopped and disabled".format(feature_name))
return True
def render_all_feature_states(self):
"""
Render the Template (if any) for the state field of the FEATURE Table.
Update the rendered state in the config db
"""
feature_table = self.config_db.get_table('FEATURE')
for feature_name in feature_table:
if not feature_name:
syslog.syslog(syslog.LOG_WARNING, "Feature is None")
continue
state = feature_table[feature_name]['state']
state = feature_table.get(feature_name, {}).get('state', '')
if not state:
syslog.syslog(syslog.LOG_WARNING, "Enable state of feature '{}' is None".format(feature_name))
continue
target_state = self.get_target_state(feature_name, state)
# Store the initial value of 'state' field in 'FEATURE' table of a specific container
self.cached_feature_states[feature_name] = target_state
self.set_target_state(feature_name, state)
def set_target_state(self, feature_name, state):
template = jinja2.Template(state)
target_state = template.render(self.device_config)
entry = self.config_db.get_entry('FEATURE', feature_name)
entry["state"] = target_state
self.config_db.set_entry("FEATURE", feature_name, entry)
self.update_feature_state(feature_name, target_state, feature_table)
def __get_intf_name(self, key):
if isinstance(key, tuple) and key:
intf = key[0]
else:
intf = key
return intf
def aaa_handler(self, key, data):
def aaa_handler(self, key, op, data):
self.aaacfg.aaa_update(key, data)
syslog.syslog(syslog.LOG_INFO, 'AAA Update: key: {}, op: {}, data: {}'.format(key, op, data))
def tacacs_server_handler(self, key, data):
def tacacs_server_handler(self, key, op, data):
self.aaacfg.tacacs_server_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'TACPLUS_SERVER update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def tacacs_global_handler(self, key, data):
def tacacs_global_handler(self, key, op, data):
self.aaacfg.tacacs_global_update(key, data)
log_data = copy.deepcopy(data)
if 'passkey' in log_data:
log_data['passkey'] = obfuscate(log_data['passkey'])
syslog.syslog(syslog.LOG_INFO, 'value of {} changed to {}'.format(key, log_data))
syslog.syslog(syslog.LOG_INFO, 'TACPLUS Global update: key: {}, op: {}, data: {}'.format(key, op, log_data))
def lpbk_handler(self, key, data):
def lpbk_handler(self, key, op, data):
key = ConfigDBConnector.deserialize_key(key)
# Check if delete operation by fetch existing keys
keys = self.config_db.get_keys('LOOPBACK_INTERFACE')
if key in keys:
add = True
else:
if op == "DEL":
add = False
else:
add = True
self.iptables.iptables_handler(key, data, add)
self.ntpcfg.handle_ntp_source_intf_chg(key)
lpbk_name = self.__get_intf_name(key)
self.ntpcfg.handle_ntp_source_intf_chg(lpbk_name)
def feature_state_handler(self, key, data):
feature_name = key
feature_table = self.config_db.get_table('FEATURE')
if feature_name not in feature_table:
syslog.syslog(syslog.LOG_WARNING, "Feature '{}' not in FEATURE table".format(feature_name))
return
state = feature_table[feature_name]['state']
def feature_state_handler(self, feature_name, op, feature_cfg):
if not feature_cfg:
syslog.syslog(syslog.LOG_WARNING, "Deregistering feature {}".format(feature_name))
self.cached_feature_states.pop(feature_name)
state = feature_cfg.get("state", "")
if not state:
syslog.syslog(syslog.LOG_WARNING, "Enable state of feature '{}' is None".format(feature_name))
return
self.cached_feature_states.setdefault(feature_name, 'disabled')
# Enable/disable the container service if the feature state was changed from its previous state.
if self.cached_feature_states[feature_name] != state:
self.update_feature_state(feature_name, state, feature_table)
if self.cached_feature_states.get(feature_name, "") != state:
self.update_feature_state(feature_name, state, feature_cfg)
self.cached_feature_states[feature_name] = state
def ntp_server_handler (self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP server handler...')
ntp_server_db = self.config_db.get_table('NTP_SERVER')
data = ntp_server_db
self.ntpcfg.ntp_server_update(key, data, False)
def ntp_server_handler (self, key, op, data):
self.ntpcfg.ntp_server_update(key, op)
def ntp_global_handler (self, key, data):
syslog.syslog(syslog.LOG_INFO, 'NTP global handler...')
self.ntpcfg.ntp_global_update(key, data, False)
def ntp_global_handler (self, key, op, data):
self.ntpcfg.ntp_global_update(key, data)
def kdump_handler (self, key, data):
def kdump_handler (self, key, op, data):
syslog.syslog(syslog.LOG_INFO, 'Kdump handler...')
self.kdumpCfg.kdump_update(key, data, False)
self.kdumpCfg.kdump_update(key, data)
def wait_till_system_init_done(self):
@ -635,36 +638,70 @@ class HostConfigDaemon:
systemctl_cmd = "sudo systemctl is-system-running --wait --quiet"
subprocess.call(systemctl_cmd, shell=True)
def start(self):
def subscribe(self, table, callback, pri):
try:
if table not in self.callbacks:
self.callbacks[table] = []
subscriber = SubscriberStateTable(self.dbconn, table, TableConsumable.DEFAULT_POP_BATCH_SIZE, pri)
self.selector.addSelectable(subscriber) # Add to the Selector
self.subscriber_map[subscriber.getFd()] = (subscriber, table) # Maintain a mapping b/w subscriber & fd
self.config_db.subscribe('AAA', lambda table, key, data: self.aaa_handler(key, data))
self.config_db.subscribe('TACPLUS_SERVER', lambda table, key, data: self.tacacs_server_handler(key, data))
self.config_db.subscribe('TACPLUS', lambda table, key, data: self.tacacs_global_handler(key, data))
self.config_db.subscribe('LOOPBACK_INTERFACE', lambda table, key, data: self.lpbk_handler(key, data))
self.config_db.subscribe('FEATURE', lambda table, key, data: self.feature_state_handler(key, data))
self.config_db.subscribe('NTP_SERVER', lambda table, key, data: self.ntp_server_handler(key, data))
self.config_db.subscribe('NTP', lambda table, key, data: self.ntp_global_handler(key, data))
self.config_db.subscribe('KDUMP', lambda table, key, data: self.kdump_handler(key, data))
self.callbacks[table].append(callback)
except Exception as err:
syslog.syslog(syslog.LOG_ERR, "Subscribe to table {} failed with error {}".format(table, err))
def register_callbacks(self):
self.subscribe('KDUMP', lambda table, key, op, data: self.kdump_handler(key, op, data), HOSTCFGD_MAX_PRI)
# Handle FEATURE updates before other tables
self.subscribe('FEATURE', lambda table, key, op, data: self.feature_state_handler(key, op, data), HOSTCFGD_MAX_PRI-1)
# Handle AAA and TACACS related tables
self.subscribe('AAA', lambda table, key, op, data: self.aaa_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('TACPLUS_SERVER', lambda table, key, op, data: self.tacacs_server_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('TACPLUS', lambda table, key, op, data: self.tacacs_global_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
self.subscribe('LOOPBACK_INTERFACE', lambda table, key, op, data: self.lpbk_handler(key, op, data), HOSTCFGD_MAX_PRI-2)
# Handle NTP & NTP_SERVER updates
self.subscribe('NTP_SERVER', lambda table, key, op, data: self.ntp_server_handler(key, op, data), HOSTCFGD_MAX_PRI-3)
self.subscribe('NTP', lambda table, key, op, data: self.ntp_global_handler(key, op, data), HOSTCFGD_MAX_PRI-3)
syslog.syslog(syslog.LOG_INFO,
"Waiting for systemctl to finish initialization")
self.wait_till_system_init_done()
syslog.syslog(syslog.LOG_INFO,
"systemctl has finished initialization -- proceeding ...")
# Update all feature states once upon starting
self.update_all_feature_states()
def start(self):
while True:
state, selectable_ = self.selector.select(DEFAULT_SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
syslog.syslog(syslog.LOG_ERR,
"error returned by select")
continue
# Defer load until subscribe
self.load()
self.config_db.listen()
fd = selectable_.getFd()
# Get the Corresponding subscriber & table
subscriber, table = self.subscriber_map.get(fd, (None, ""))
if not subscriber:
syslog.syslog(syslog.LOG_ERR,
"No Subscriber object found for fd: {}, subscriber map: {}".format(fd, subscriber_map))
continue
key, op, fvs = subscriber.pop()
# Get the registered callback
cbs = self.callbacks.get(table, None)
for callback in cbs:
callback(table, key, op, dict(fvs))
def main():
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
daemon = HostConfigDaemon()
daemon.render_all_feature_states()
daemon.register_callbacks()
daemon.load()
daemon.start()
if __name__ == "__main__":
main()

View File

@ -29,7 +29,8 @@ setup(
'parameterized',
'pytest',
'pyfakefs',
'sonic-py-common'
'sonic-py-common',
'deepdiff'
],
classifiers = [
'Development Status :: 3 - Alpha',

View File

@ -12,6 +12,14 @@ class MockConfigDb(object):
def set_config_db(test_config_db):
MockConfigDb.CONFIG_DB = test_config_db
@staticmethod
def deserialize_key(key, separator="|"):
tokens = key.split(separator)
if len(tokens) > 1:
return tuple(tokens)
else:
return key
@staticmethod
def get_config_db():
return MockConfigDb.CONFIG_DB
@ -25,8 +33,85 @@ class MockConfigDb(object):
def get_entry(self, key, field):
return MockConfigDb.CONFIG_DB[key][field]
def mod_entry(self, key, field, data):
existing_data = self.get_entry(key, field)
existing_data.update(data)
self.set_entry(key, field, existing_data)
def set_entry(self, key, field, data):
MockConfigDb.CONFIG_DB[key][field] = data
def get_table(self, table_name):
return MockConfigDb.CONFIG_DB[table_name]
class MockSelect():
event_queue = []
@staticmethod
def set_event_queue(Q):
MockSelect.event_queue = Q
@staticmethod
def get_event_queue():
return MockSelect.event_queue
@staticmethod
def reset_event_queue():
MockSelect.event_queue = []
def __init__(self):
self.sub_map = {}
self.TIMEOUT = "TIMEOUT"
self.ERROR = "ERROR"
def addSelectable(self, subscriber):
self.sub_map[subscriber.table] = subscriber
def select(self, TIMEOUT):
if not MockSelect.get_event_queue():
raise TimeoutError
table, key = MockSelect.get_event_queue().pop(0)
self.sub_map[table].nextKey(key)
return "OBJECT", self.sub_map[table]
class MockSubscriberStateTable():
FD_INIT = 0
@staticmethod
def generate_fd():
curr = MockSubscriberStateTable.FD_INIT
MockSubscriberStateTable.FD_INIT = curr + 1
return curr
@staticmethod
def reset_fd():
MockSubscriberStateTable.FD_INIT = 0
def __init__(self, conn, table, pop, pri):
self.fd = MockSubscriberStateTable.generate_fd()
self.next_key = ''
self.table = table
def getFd(self):
return self.fd
def nextKey(self, key):
self.next_key = key
def pop(self):
table = MockConfigDb.CONFIG_DB.get(self.table, {})
if self.next_key not in table:
op = "DEL"
fvs = {}
else:
op = "SET"
fvs = table.get(self.next_key, {})
return self.next_key, op, fvs
class MockDBConnector():
def __init__(self, db, val):
pass

View File

@ -1,16 +1,20 @@
import os
import sys
import swsscommon
import swsscommon as swsscommon_package
from swsscommon import swsscommon
from parameterized import parameterized
from sonic_py_common.general import load_module_from_source
from unittest import TestCase, mock
from .test_vectors import HOSTCFGD_TEST_VECTOR
from tests.common.mock_configdb import MockConfigDb
from .test_vectors import HOSTCFGD_TEST_VECTOR, HOSTCFG_DAEMON_CFG_DB
from tests.common.mock_configdb import MockConfigDb, MockSubscriberStateTable
from tests.common.mock_configdb import MockSelect, MockDBConnector
from pyfakefs.fake_filesystem_unittest import patchfs
from deepdiff import DeepDiff
from unittest.mock import call
swsscommon.swsscommon.ConfigDBConnector = MockConfigDb
test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
@ -19,7 +23,10 @@ sys.path.insert(0, modules_path)
# Load the file under test
hostcfgd_path = os.path.join(scripts_path, 'hostcfgd')
hostcfgd = load_module_from_source('hostcfgd', hostcfgd_path)
hostcfgd.ConfigDBConnector = MockConfigDb
hostcfgd.SubscriberStateTable = MockSubscriberStateTable
hostcfgd.Select = MockSelect
hostcfgd.DBConnector = MockDBConnector
class TestHostcfgd(TestCase):
"""
@ -38,21 +45,13 @@ class TestHostcfgd(TestCase):
Returns:
None
"""
is_equal = len(table) == len(expected_table)
if is_equal:
for key, fields in expected_table.items():
is_equal = is_equal and key in table and len(fields) == len(table[key])
if is_equal:
for field, value in fields.items():
is_equal = is_equal and value == table[key][field]
if not is_equal:
break;
else:
break
return is_equal
ddiff = DeepDiff(table, expected_table, ignore_order=True)
print('DIFF:', ddiff)
return True if not ddiff else False
@parameterized.expand(HOSTCFGD_TEST_VECTOR)
def test_hostcfgd(self, test_name, test_data):
@patchfs
def test_hostcfgd(self, test_name, test_data, fs):
"""
Test hostcfd daemon initialization
@ -63,6 +62,8 @@ class TestHostcfgd(TestCase):
Returns:
None
"""
print(swsscommon_package.__path__)
fs.add_real_paths(swsscommon_package.__path__) # add real path of swsscommon for database_config.json
MockConfigDb.set_config_db(test_data["config_db"])
with mock.patch("hostcfgd.subprocess") as mocked_subprocess:
popen_mock = mock.Mock()
@ -71,9 +72,157 @@ class TestHostcfgd(TestCase):
mocked_subprocess.Popen.return_value = popen_mock
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.update_all_feature_states()
host_config_daemon.render_all_feature_states()
features = MockConfigDb.CONFIG_DB['FEATURE']
for key, fvs in features.items():
host_config_daemon.feature_state_handler(key, 'SET', fvs)
assert self.__verify_table(
MockConfigDb.get_config_db()["FEATURE"],
test_data["expected_config_db"]["FEATURE"]
), "Test failed for test data: {0}".format(test_data)
mocked_subprocess.check_call.assert_has_calls(test_data["expected_subprocess_calls"], any_order=True)
class TesNtpCfgd(TestCase):
"""
Test hostcfd daemon - NtpCfgd
"""
def setUp(self):
MockConfigDb.CONFIG_DB['NTP'] = {'global': {'vrf': 'mgmt', 'src_intf': 'eth0'}}
MockConfigDb.CONFIG_DB['NTP_SERVER'] = {'0.debian.pool.ntp.org': {}}
def tearDown(self):
MockConfigDb.CONFIG_DB = {}
def test_ntp_global_update_with_no_servers(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global'])
mocked_subprocess.check_call.assert_not_called()
def test_ntp_global_update_ntp_servers(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global_update('global', MockConfigDb.CONFIG_DB['NTP']['global'])
ntpcfgd.ntp_server_update('0.debian.pool.ntp.org', 'SET')
mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)])
def test_loopback_update(self):
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
ntpcfgd = hostcfgd.NtpCfg()
ntpcfgd.ntp_global = MockConfigDb.CONFIG_DB['NTP']['global']
ntpcfgd.ntp_servers.add('0.debian.pool.ntp.org')
ntpcfgd.handle_ntp_source_intf_chg('eth0')
mocked_subprocess.check_call.assert_has_calls([call('systemctl restart ntp-config', shell=True)])
class TestHostcfgdDaemon(TestCase):
def setUp(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
def tearDown(self):
MockConfigDb.CONFIG_DB = {}
@patchfs
def test_feature_events(self, fs):
MockSelect.event_queue = [('FEATURE', 'dhcp_relay'),
('FEATURE', 'mux'),
('FEATURE', 'telemetry')]
daemon = hostcfgd.HostConfigDaemon()
daemon.render_all_feature_states()
daemon.register_callbacks()
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sudo systemctl unmask dhcp_relay.service', shell=True),
call('sudo systemctl enable dhcp_relay.service', shell=True),
call('sudo systemctl start dhcp_relay.service', shell=True),
call('sudo systemctl unmask mux.service', shell=True),
call('sudo systemctl enable mux.service', shell=True),
call('sudo systemctl start mux.service', shell=True),
call('sudo systemctl unmask telemetry.service', shell=True),
call('sudo systemctl unmask telemetry.timer', shell=True),
call('sudo systemctl enable telemetry.timer', shell=True),
call('sudo systemctl start telemetry.timer', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected)
# Change the state to disabled
MockConfigDb.CONFIG_DB['FEATURE']['telemetry']['state'] = 'disabled'
MockSelect.event_queue = [('FEATURE', 'telemetry')]
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sudo systemctl stop telemetry.timer', shell=True),
call('sudo systemctl disable telemetry.timer', shell=True),
call('sudo systemctl mask telemetry.timer', shell=True),
call('sudo systemctl stop telemetry.service', shell=True),
call('sudo systemctl disable telemetry.service', shell=True),
call('sudo systemctl mask telemetry.service', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected)
def test_loopback_events(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
MockSelect.event_queue = [('NTP', 'global'),
('NTP_SERVER', '0.debian.pool.ntp.org'),
('LOOPBACK_INTERFACE', 'Loopback0|10.184.8.233/32')]
daemon = hostcfgd.HostConfigDaemon()
daemon.register_callbacks()
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('systemctl restart ntp-config', shell=True),
call('iptables -t mangle --append PREROUTING -p tcp --tcp-flags SYN SYN -d 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True),
call('iptables -t mangle --append POSTROUTING -p tcp --tcp-flags SYN SYN -s 10.184.8.233 -j TCPMSS --set-mss 1460', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected, any_order=True)
def test_kdump_event(self):
MockConfigDb.set_config_db(HOSTCFG_DAEMON_CFG_DB)
daemon = hostcfgd.HostConfigDaemon()
daemon.register_callbacks()
assert MockConfigDb.CONFIG_DB['KDUMP']['config']
MockSelect.event_queue = [('KDUMP', 'config')]
with mock.patch('hostcfgd.subprocess') as mocked_subprocess:
popen_mock = mock.Mock()
attrs = {'communicate.return_value': ('output', 'error')}
popen_mock.configure_mock(**attrs)
mocked_subprocess.Popen.return_value = popen_mock
try:
daemon.start()
except TimeoutError:
pass
expected = [call('sonic-kdump-config --disable', shell=True),
call('sonic-kdump-config --num_dumps 3', shell=True),
call('sonic-kdump-config --memory 0M-2G:256M,2G-4G:320M,4G-8G:384M,8G-:448M', shell=True)]
mocked_subprocess.check_call.assert_has_calls(expected, any_order=True)

View File

@ -381,3 +381,62 @@ HOSTCFGD_TEST_VECTOR = [
}
]
]
HOSTCFG_DAEMON_CFG_DB = {
"FEATURE": {
"dhcp_relay": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "False",
"high_mem_alert": "disabled",
"set_owner": "kube",
"state": "{% if not (DEVICE_METADATA is defined and DEVICE_METADATA['localhost'] is defined and DEVICE_METADATA['localhost']['type'] is defined and DEVICE_METADATA['localhost']['type'] != 'ToRRouter') %}enabled{% else %}disabled{% endif %}"
},
"mux": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "False",
"high_mem_alert": "disabled",
"set_owner": "local",
"state": "{% if 'subtype' in DEVICE_METADATA['localhost'] and DEVICE_METADATA['localhost']['subtype'] == 'DualToR' %}enabled{% else %}always_disabled{% endif %}"
},
"telemetry": {
"auto_restart": "enabled",
"has_global_scope": "True",
"has_per_asic_scope": "False",
"has_timer": "True",
"high_mem_alert": "disabled",
"set_owner": "kube",
"state": "enabled",
"status": "enabled"
},
},
"KDUMP": {
"config": {
}
},
"NTP": {
"global": {
"vrf": "default",
"src_intf": "eth0;Loopback0"
}
},
"NTP_SERVER": {
"0.debian.pool.ntp.org": {}
},
"LOOPBACK_INTERFACE": {
"Loopback0|10.184.8.233/32": {
"scope": "global",
"family": "IPv4"
}
},
"DEVICE_METADATA": {
"localhost": {
"subtype": "DualToR",
"type": "ToRRouter",
}
}
}