#!/usr/bin/env python import sys import subprocess import datetime import time import syslog import signal import traceback import os import tempfile import json from collections import defaultdict from pprint import pprint from pprint import pformat import jinja2 import netaddr from swsscommon import swsscommon g_run = True g_debug = False def run_command(command, shell=False): str_cmd = " ".join(command) if g_debug: syslog.syslog(syslog.LOG_DEBUG, "execute command {}.".format(str_cmd)) p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode != 0: syslog.syslog(syslog.LOG_ERR, 'command execution returned {}. Command: "{}", stdout: "{}", stderr: "{}"'.format(p.returncode, str_cmd, stdout, stderr)) return p.returncode, stdout, stderr class TemplateFabric(object): def __init__(self): j2_template_paths = ['/usr/share/sonic/templates'] j2_loader = jinja2.FileSystemLoader(j2_template_paths) j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=True) j2_env.filters['ipv4'] = self.is_ipv4 j2_env.filters['ipv6'] = self.is_ipv6 self.env = j2_env def from_file(self, filename): return self.env.get_template(filename) def from_string(self, tmpl): return self.env.from_string(tmpl) @staticmethod def is_ipv4(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 4 @staticmethod def is_ipv6(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 6 class Daemon(object): SELECT_TIMEOUT = 1000 def __init__(self): self.db_connectors = {} self.selector = swsscommon.Select() self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> [] self.subscribers = set() def add_manager(self, db_name, table_name, callback): db = swsscommon.SonicDBConfig.getDbId(db_name) if db not in self.db_connectors: self.db_connectors[db] = swsscommon.DBConnector(db_name, 0) if table_name not in self.callbacks[db]: conn = self.db_connectors[db] subscriber = swsscommon.SubscriberStateTable(conn, table_name) self.subscribers.add(subscriber) self.selector.addSelectable(subscriber) self.callbacks[db][table_name].append(callback) def run(self): while g_run: state, _ = self.selector.select(Daemon.SELECT_TIMEOUT) if state == self.selector.TIMEOUT: continue elif state == self.selector.ERROR: raise Exception("Received error from select") for subscriber in self.subscribers: key, op, fvs = subscriber.pop() if not key: continue if g_debug: syslog.syslog(syslog.LOG_DEBUG, "Received message : {}".format((key, op, fvs))) for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]: callback(key, op, dict(fvs)) class Directory(object): def __init__(self): self.data = defaultdict(dict) self.notify = defaultdict(lambda: defaultdict(list)) def path_traverse(self, slot, path): if slot not in self.data: return False, None elif path == '': return True, self.data[slot] d = self.data[slot] for p in path.split("/"): if p not in d: return False, None d = d[p] return True, d def path_exist(self, slot, path): return self.path_traverse(slot, path)[0] def get_path(self, slot, path): return self.path_traverse(slot, path)[1] def put(self, slot, key, value): self.data[slot][key] = value if slot in self.notify: for path in self.notify[slot].keys(): if self.path_exist(slot, path): for handler in self.notify[slot][path]: handler() def get(self, slot, key): return self.data[slot][key] def remove(self, slot, key): if slot in self.data: if key in self.data[slot]: del self.data[slot][key] else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot)) else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot)) def remove_slot(self, slot, key): if slot in self.data: del self.data[slot] else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove slot '%s'. The slot doesn't exist" % slot) def get_slot(self, slot): return self.data[slot] def available_slot(self, slot): return slot in self.data def available_deps(self, deps): res = True for slot, path in deps: res = res and self.path_exist(slot, path) return res def subscribe(self, deps, handler): for slot, path in deps: self.notify[slot][path].append(handler) class Manager(object): def __init__(self, daemon, directory, deps, database, table_name): self.directory = directory self.deps = deps self.set_queue = [] daemon.add_manager(database, table_name, self.handler) directory.subscribe(deps, self.on_deps_change) def handler(self, key, op, data): if op == swsscommon.SET_COMMAND: if self.directory.available_deps(self.deps): res = self.set_handler(key, data) if not res: self.set_queue.append((key, data)) else: self.set_queue.append((key, data)) elif op == swsscommon.DEL_COMMAND: self.del_handler(key) else: syslog.syslog(syslog.LOG_ERR, 'Invalid operation "%s" for key "%s"' % (op, key)) def on_deps_change(self): if not self.directory.available_deps(self.deps): return new_queue = [] for key, data in self.set_queue: res = self.set_handler(key, data) if not res: new_queue.append((key, data)) self.set_queue = new_queue def set_handler(self, key, data): syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) def del_handler(self, key): syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) class BGPDeviceMetaMgr(Manager): def __init__(self, daemon, directory): super(BGPDeviceMetaMgr, self).__init__( daemon, directory, [], "CONFIG_DB", swsscommon.CFG_DEVICE_METADATA_TABLE_NAME ) def set_handler(self, key, data): if key != "localhost" or "bgp_asn" not in data: return if self.directory.path_exist("meta", "localhost/bgp_asn"): bgp_asn = self.directory.get_path("meta", "localhost/bgp_asn") if bgp_asn == data["bgp_asn"]: return self.directory.put("meta", key, data) return True def del_handler(self, key): self.directory.remove("meta", key) class BGPNeighborMetaMgr(Manager): def __init__(self, daemon, directory): super(BGPNeighborMetaMgr, self).__init__( daemon, directory, [], "CONFIG_DB", swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME ) def set_handler(self, key, data): self.directory.put("neigmeta", key, data) return True def del_handler(self, key): self.directory.remove("neigmeta", key) class BGPPeerMgr(Manager): def __init__(self, daemon, directory): super(BGPPeerMgr, self).__init__( daemon, directory, [ ("meta", "localhost/bgp_asn"), ("neigmeta", ""), ("local_addresses", ""), ("interfaces", ""), ], "CONFIG_DB", swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME ) self.peers = self.load_peers() fabric = TemplateFabric() self.templates = { "add": fabric.from_file('bgpd.peer.conf.j2'), "delete": fabric.from_string('no neighbor {{ neighbor_addr }}'), "shutdown": fabric.from_string('neighbor {{ neighbor_addr }} shutdown'), "no shutdown": fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'), } def set_handler(self, key, data): key = self.normalize_key(key) vrf, nbr = key.split('|', 1) if key not in self.peers: cmd = None if "local_addr" not in data: syslog.syslog(syslog.LOG_WARNING, 'Peer {}. Error in missing required attribute "local_addr"'.format(key)) else: # The bgp session that belongs to a vnet cannot be advertised as the default BGP session. # So we need to check whether this bgp session belongs to a vnet. interface = InterfaceMgr.get_local_interface(self.directory, data["local_addr"]) if not interface: syslog.syslog(syslog.LOG_INFO, 'Peer {} with local address {} wait for the corresponding interface to be set'.format( key, data["local_addr"] ) ) return False vnet = InterfaceMgr.get_vnet(interface) if vnet: # Ignore the bgp session that is in a vnet syslog.syslog( syslog.LOG_INFO, 'Ignore the BGP peer {} as the interface {} is in vnet {}'.format( key, interface, vnet ) ) return True neigmeta = self.directory.get_slot("neigmeta") if 'name' in data and data["name"] not in neigmeta: syslog.syslog(syslog.LOG_INFO, 'Peer {} with neighbor name {} wait for the corresponding neighbor metadata to be set'.format( key, data["name"] ) ) return False try: cmd = self.templates["add"].render( DEVICE_METADATA=self.directory.get_slot("meta"), DEVICE_NEIGHBOR_METADATA=neigmeta, neighbor_addr=nbr, bgp_session=data ) except: syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data)) return True if cmd is not None: rc = self.apply_op(cmd, vrf) if rc: self.peers.add(key) syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} wasn't added.".format(key)) else: # when the peer is already configured we support "shutdown/no shutdown" # commands for the peers only if "admin_status" in data: if data['admin_status'] == 'up': rc = self.apply_op(self.templates["no shutdown"].render(neighbor_addr=nbr), vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'up'.".format(key)) elif data['admin_status'] == 'down': rc = self.apply_op(self.templates["shutdown"].render(neighbor_addr=nbr), vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'down'.".format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status'])) else: syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key)) return True def del_handler(self, key): key = self.normalize_key(key) vrf, nbr = key.split('|', 1) if key not in self.peers: syslog.syslog(syslog.LOG_WARNING, 'Peer {} has not been found'.format(key)) return cmd = self.templates["delete"].render(neighbor_addr=nbr) rc = self.apply_op(cmd, vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key)) self.peers.remove(key) else: syslog.syslog(syslog.LOG_ERR, "Peer {} hasn't been removed".format(key)) def apply_op(self, cmd, vrf): bgp_asn = self.directory.get_slot("meta")["localhost"]["bgp_asn"] fd, tmp_filename = tempfile.mkstemp(dir='/tmp') os.close(fd) with open(tmp_filename, 'w') as fp: if vrf == 'default': fp.write('router bgp %s\n' % bgp_asn) else: fp.write('router bgp %s vrf %s\n' % (bgp_asn, vrf)) fp.write("%s\n" % cmd) command = ["vtysh", "-f", tmp_filename] rc, _, _ = run_command(command) os.remove(tmp_filename) return rc == 0 @staticmethod def normalize_key(key): if '|' not in key: return 'default|' + key else: return key @staticmethod def load_peers(): vrfs = [] command = ["vtysh", "-c", "show bgp vrfs json"] rc, out, err = run_command(command) if rc == 0: js_vrf = json.loads(out) vrfs = js_vrf['vrfs'].keys() peers = set() for vrf in vrfs: command = ["vtysh", "-c", 'show bgp vrf {} neighbors json'.format(vrf)] rc, out, err = run_command(command) if rc == 0: js_bgp = json.loads(out) for nbr in js_bgp.keys(): peers.add((vrf, nbr)) return peers class InterfaceMgr(Manager): def __init__(self, daemon, directory, interface_table = swsscommon.CFG_INTF_TABLE_NAME): super(InterfaceMgr, self).__init__( daemon, directory, [], "CONFIG_DB", interface_table ) def set_handler(self, key, data): # Interface table can have two keys, # one with ip prefix and one without ip prefix if '|' in key: data = {} data["interface"], network = key.split('|', 1) try: network = netaddr.IPNetwork(str(network)) except: syslog.syslog( syslog.LOG_WARNING, 'Subnet {} format is wrong for interface {}'.format( network, data["interface"] ) ) return False data["prefixlen"] = str(network.prefixlen) ip = str(network.ip) self.directory.put("local_addresses", ip, data) else: self.directory.put("interfaces", key, data) return True def del_handler(self, key): if '|' in key: interface, network = key.split('|', 1) try: network = netaddr.IPNetwork(str(network)) except: syslog.syslog( syslog.LOG_WARNING, 'Subnet {} format is wrong for interface {}'.format( network, interface ) ) return False ip = str(network.ip) self.directory.remove("local_addresses", ip) else: self.directory.remove("interfaces", key) @staticmethod def get_local_interface(directory, local_addr): """ @summary: Get interface according to the local address from the directory @param directory: Directory object that stored metadata of interfaces @param local_addr: Local address of the interface @return: Return the metadata of the interface with the local address If the interface has not been set, return None """ local_addresses = directory.get_slot("local_addresses") # Check if the local address of this bgp session has been set if local_addr not in local_addresses: return None local_address = local_addresses[local_addr] interfaces = directory.get_slot("interfaces") # Check if the information for the interface of this local address has been set if local_address.has_key("interface") and local_address["interface"] in interfaces: return interfaces[local_address["interface"]] else: return None @staticmethod def get_vnet(interface): """ @summary: Get the VNet name of the interface @param interface: The metadata of the interface @return: Return the vnet name of the interface if this interface belongs to a vnet, Otherwise return None """ if interface.has_key("vnet_name") and interface["vnet_name"]: return interface["vnet_name"] else: return None class LoopbackInterfaceMgr(InterfaceMgr): def __init__(self, daemon, directory): super(LoopbackInterfaceMgr, self).__init__( daemon, directory, swsscommon.CFG_LOOPBACK_INTERFACE_TABLE_NAME ) class VlanInterfaceMgr(InterfaceMgr): def __init__(self, daemon, directory): super(VlanInterfaceMgr, self).__init__( daemon, directory, swsscommon.CFG_VLAN_INTF_TABLE_NAME ) class PortChannelInterfaceMgr(InterfaceMgr): def __init__(self, daemon, directory): super(PortChannelInterfaceMgr, self).__init__( daemon, directory, swsscommon.CFG_LAG_INTF_TABLE_NAME ) def wait_for_bgpd(): # wait for 20 seconds stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20) syslog.syslog(syslog.LOG_INFO, "Start waiting for bgpd: %s" % str(datetime.datetime.now())) while datetime.datetime.now() < stop_time: rc, out, err = run_command(["vtysh", "-c", "show daemons"]) if rc == 0 and "bgpd" in out: syslog.syslog(syslog.LOG_INFO, "bgpd connected to vtysh: %s" % str(datetime.datetime.now())) return time.sleep(0.1) # sleep 100 ms raise RuntimeError("bgpd hasn't been started in 20 seconds") def main(): managers = [ BGPDeviceMetaMgr, BGPNeighborMetaMgr, BGPPeerMgr, InterfaceMgr, LoopbackInterfaceMgr, VlanInterfaceMgr, PortChannelInterfaceMgr, ] wait_for_bgpd() daemon = Daemon() directory = Directory() manager_instanses = [ manager(daemon, directory) for manager in managers ] daemon.run() def signal_handler(signum, frame): global g_run g_run = False if __name__ == '__main__': rc = 0 try: syslog.openlog('bgpcfgd') signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) main() except KeyboardInterrupt: syslog.syslog(syslog.LOG_NOTICE, "Keyboard interrupt") except RuntimeError as e: syslog.syslog(syslog.LOG_CRIT, "%s" % str(e)) rc = -2 except Exception as e: syslog.syslog(syslog.LOG_CRIT, "Got an exception %s: Traceback: %s" % (str(e), traceback.format_exc())) rc = -1 finally: syslog.closelog() try: sys.exit(rc) except SystemExit: os._exit(rc)