import typing from natsort import natsorted import datetime import pickle import os import copy import click from tabulate import tabulate import utilities_common.multi_asic as multi_asic_util from swsscommon.swsscommon import CounterTable, MacsecCounter from utilities_common.cli import UserCache CACHE_MANAGER = UserCache(app_name="macsec") CACHE_FILE = os.path.join(CACHE_MANAGER.get_directory(), "macsecstats{}") DB_CONNECTOR = None COUNTER_TABLE = None class MACsecCfgMeta(object): def __init__(self, *args) -> None: SEPARATOR = DB_CONNECTOR.get_db_separator(DB_CONNECTOR.CONFIG_DB) self.key = self.__class__.get_cfg_table_name() + SEPARATOR + \ SEPARATOR.join(args) self.cfgMeta = DB_CONNECTOR.get_all( DB_CONNECTOR.CONFIG_DB, self.key) if len(self.cfgMeta) == 0: raise ValueError("No such MACsecCfgMeta: {}".format(self.key)) for k, v in self.cfgMeta.items(): setattr(self, k, v) class MACsecAppMeta(object): def __init__(self, *args) -> None: SEPARATOR = DB_CONNECTOR.get_db_separator(DB_CONNECTOR.APPL_DB) self.key = self.__class__.get_appl_table_name() + SEPARATOR + \ SEPARATOR.join(args) self.meta = DB_CONNECTOR.get_all( DB_CONNECTOR.APPL_DB, self.key) if len(self.meta) == 0: raise ValueError("No such MACsecAppMeta: {}".format(self.key)) for k, v in self.meta.items(): setattr(self, k, v) class MACsecCounters(object): def __init__(self, *args) -> None: _, fvs = COUNTER_TABLE.get(MacsecCounter(), ":".join(args)) self.counters = dict(fvs) class MACsecSA(MACsecAppMeta, MACsecCounters): def __init__(self, port_name: str, sci: str, an: str) -> None: self.port_name = port_name self.sci = sci self.an = an MACsecAppMeta.__init__(self, port_name, sci, an) MACsecCounters.__init__(self, port_name, sci, an) def dump_str(self, cache = None) -> str: buffer = self.get_header() meta = sorted(self.meta.items(), key=lambda x: x[0]) counters = copy.deepcopy(self.counters) if cache: for k, v in counters.items(): if k in cache.counters: counters[k] = int(counters[k]) - int(cache.counters[k]) counters = sorted(counters.items(), key=lambda x: x[0]) buffer += tabulate(meta + counters) buffer = "\n".join(["\t\t" + line for line in buffer.splitlines()]) return buffer class MACsecIngressSA(MACsecSA): def __init__(self, port_name: str, sci: str, an: str) -> None: super(MACsecIngressSA, self).__init__(port_name, sci, an) @classmethod def get_appl_table_name(cls) -> str: return "MACSEC_INGRESS_SA_TABLE" def get_header(self): return "MACsec Ingress SA ({})\n".format(self.an) class MACsecEgressSA(MACsecSA): def __init__(self, port_name: str, sci: str, an: str) -> None: super(MACsecEgressSA, self).__init__(port_name, sci, an) @classmethod def get_appl_table_name(cls) -> str: return "MACSEC_EGRESS_SA_TABLE" def get_header(self): return "MACsec Egress SA ({})\n".format(self.an) class MACsecSC(MACsecAppMeta): def __init__(self, port_name: str, sci: str) -> None: self.port_name = port_name self.sci = sci super(MACsecSC, self).__init__(port_name, sci) class MACsecIngressSC(MACsecSC): def __init__(self, port_name: str, sci: str) -> None: super(MACsecIngressSC, self).__init__(port_name, sci) @classmethod def get_appl_table_name(cls) -> str: return "MACSEC_INGRESS_SC_TABLE" def dump_str(self, cache = None) -> str: buffer = self.get_header() buffer = "\n".join(["\t" + line for line in buffer.splitlines()]) return buffer def get_header(self): return "MACsec Ingress SC ({})\n".format(self.sci) class MACsecEgressSC(MACsecSC): def __init__(self, port_name: str, sci: str) -> None: super(MACsecEgressSC, self).__init__(port_name, sci) @classmethod def get_appl_table_name(cls) -> str: return "MACSEC_EGRESS_SC_TABLE" def dump_str(self, cache = None) -> str: buffer = self.get_header() buffer += tabulate(sorted(self.meta.items(), key=lambda x: x[0])) buffer = "\n".join(["\t" + line for line in buffer.splitlines()]) return buffer def get_header(self): return "MACsec Egress SC ({})\n".format(self.sci) class MACsecPort(MACsecAppMeta, MACsecCfgMeta): def __init__(self, port_name: str) -> None: self.port_name = port_name MACsecAppMeta.__init__(self, port_name) MACsecCfgMeta.__init__(self, port_name) @classmethod def get_appl_table_name(cls) -> str: return "MACSEC_PORT_TABLE" @classmethod def get_cfg_table_name(cls) -> str: return "PORT" def dump_str(self, cache = None) -> str: buffer = self.get_header() # Add the profile information to the meta dict from config meta dict self.meta["profile"] = self.cfgMeta["macsec"] buffer += tabulate(sorted(self.meta.items(), key=lambda x: x[0])) return buffer def get_header(self) -> str: return "MACsec port({})\n".format(self.port_name) class MACsecProfile(MACsecCfgMeta): def __init__(self, profile_name: str) -> None: self.profile_name = profile_name super(MACsecProfile, self).__init__(profile_name) @classmethod def get_cfg_table_name(cls) -> str: return "MACSEC_PROFILE" def dump_str(self, cache = None) -> str: buffer = self.get_header() # Don't display the primary and fallback CAK if 'primary_cak' in self.cfgMeta: del self.cfgMeta['primary_cak'] if 'fallback_cak' in self.cfgMeta: del self.cfgMeta['fallback_cak'] t_buffer = tabulate(sorted(self.cfgMeta.items(), key=lambda x: x[0])) t_buffer = "\n".join(["\t" + line for line in t_buffer.splitlines()]) buffer += t_buffer return buffer def get_header(self) -> str: return "MACsec profile : {}\n".format(self.profile_name) def create_macsec_obj(key: str) -> MACsecAppMeta: attr = key.split(":") try: if attr[0] == MACsecPort.get_appl_table_name(): return MACsecPort(attr[1]) elif attr[0] == MACsecIngressSC.get_appl_table_name(): return MACsecIngressSC(attr[1], attr[2]) elif attr[0] == MACsecEgressSC.get_appl_table_name(): return MACsecEgressSC(attr[1], attr[2]) elif attr[0] == MACsecIngressSA.get_appl_table_name(): return MACsecIngressSA(attr[1], attr[2], attr[3]) elif attr[0] == MACsecEgressSA.get_appl_table_name(): return MACsecEgressSA(attr[1], attr[2], attr[3]) raise TypeError("Unknown MACsec object type") except ValueError as e: return None def create_macsec_profile_obj(key: str) -> MACsecCfgMeta: attr = key.split("|") try: if attr[0] == MACsecProfile.get_cfg_table_name(): return MACsecProfile(attr[1]) raise TypeError("Unknown MACsec object type") except ValueError as e: return None def create_macsec_objs(interface_name: str) -> typing.List[MACsecAppMeta]: objs = [] objs.append(create_macsec_obj(MACsecPort.get_appl_table_name() + ":" + interface_name)) egress_scs = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecEgressSC.get_appl_table_name() + ":" + interface_name + ":*") for sc_name in natsorted(egress_scs): sc = create_macsec_obj(sc_name) if sc is None: continue objs.append(sc) egress_sas = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecEgressSA.get_appl_table_name() + ":" + ":".join(sc_name.split(":")[1:]) + ":*") for sa_name in natsorted(egress_sas): sa = create_macsec_obj(sa_name) if sa is None: continue objs.append(sa) ingress_scs = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecIngressSC.get_appl_table_name() + ":" + interface_name + ":*") for sc_name in natsorted(ingress_scs): sc = create_macsec_obj(sc_name) if sc is None: continue objs.append(sc) ingress_sas = DB_CONNECTOR.keys(DB_CONNECTOR.APPL_DB, MACsecIngressSA.get_appl_table_name() + ":" + ":".join(sc_name.split(":")[1:]) + ":*") for sa_name in natsorted(ingress_sas): sa = create_macsec_obj(sa_name) if sa is None: continue objs.append(sa) return objs def create_macsec_profiles_objs(profile_name: str) -> typing.List[MACsecCfgMeta]: objs = [] objs.append(create_macsec_profile_obj(MACsecProfile.get_cfg_table_name() + "|" + profile_name)) return objs def cache_find(cache: dict, target: MACsecAppMeta) -> MACsecAppMeta: if not cache or not cache["objs"]: return None for obj in cache["objs"]: if type(obj) == type(target) and obj.key == target.key: # MACsec SA may be refreshed by a cycle that use the same key # So, use the SA as the identifier if isinstance(obj, MACsecSA) and obj.sak != target.sak: continue return obj return None @click.command() @click.argument('interface_name', required=False) @click.option('--profile', is_flag=True, required=False, default=False, help="show all macsec profiles") @click.option('--dump-file', is_flag=True, required=False, default=False, help="store show output to a file") @multi_asic_util.multi_asic_click_options def macsec(interface_name, dump_file, namespace, display, profile): if interface_name is not None and profile: click.echo('Interface name is not valid with profile option') return MacsecContext(namespace, display).show(interface_name, dump_file, profile) class MacsecContext(object): def __init__(self, namespace_option, display_option): self.db = None self.multi_asic = multi_asic_util.MultiAsic( display_option, namespace_option) self.macsec_profiles = [] @multi_asic_util.run_on_multi_asic def show(self, interface_name, dump_file, profile): global DB_CONNECTOR global COUNTER_TABLE DB_CONNECTOR = self.db if not profile: COUNTER_TABLE = CounterTable(self.db.get_redis_client(self.db.COUNTERS_DB)) interface_names = [name.split(":")[1] for name in self.db.keys(self.db.APPL_DB, "MACSEC_PORT*")] if interface_name is not None: if interface_name not in interface_names: return interface_names = [interface_name] objs = [] for interface_name in natsorted(interface_names): objs += create_macsec_objs(interface_name) else: profile_names = [name.split("|")[1] for name in self.db.keys(self.db.CONFIG_DB, "MACSEC_PROFILE*")] objs = [] for profile_name in natsorted(profile_names): # Check if this macsec profile is already added to profile list. This is in case of # multi-asic devices where all namespaces will have the same macsec profile defined. if profile_name not in self.macsec_profiles and not dump_file: self.macsec_profiles.append(profile_name) objs += create_macsec_profiles_objs(profile_name) cache = {} if os.path.isfile(CACHE_FILE.format(self.multi_asic.current_namespace)): cache = pickle.load(open(CACHE_FILE.format(self.multi_asic.current_namespace), "rb")) if not dump_file: if cache and cache["time"] and objs: print("Last cached time was {}".format(cache["time"])) for obj in objs: cache_obj = cache_find(cache, obj) print(obj.dump_str(cache_obj)) else: dump_obj = { "time": datetime.datetime.now(), "objs": objs } with open(CACHE_FILE.format(self.multi_asic.current_namespace), 'wb') as dump_file: pickle.dump(dump_obj, dump_file) dump_file.flush() def register(cli): cli.add_command(macsec) if __name__ == '__main__': macsec(None)