#!/usr/bin/env python import sys import subprocess import datetime import time import syslog import signal import traceback import os import tempfile import json from collections import defaultdict from pprint import pprint from pprint import pformat import jinja2 import netaddr from swsscommon import swsscommon g_run = True g_debug = False def run_command(command, shell=False): str_cmd = " ".join(command) if g_debug: syslog.syslog(syslog.LOG_DEBUG, "execute command {}.".format(str_cmd)) p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode != 0: syslog.syslog(syslog.LOG_ERR, 'command execution returned {}. Command: "{}", stdout: "{}", stderr: "{}"'.format(p.returncode, str_cmd, stdout, stderr)) return p.returncode, stdout, stderr class TemplateFabric(object): def __init__(self): j2_template_paths = ['/usr/share/sonic/templates'] j2_loader = jinja2.FileSystemLoader(j2_template_paths) j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=True) j2_env.filters['ipv4'] = self.is_ipv4 j2_env.filters['ipv6'] = self.is_ipv6 self.env = j2_env def from_file(self, filename): return self.env.get_template(filename) def from_string(self, tmpl): return self.env.from_string(tmpl) @staticmethod def is_ipv4(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 4 @staticmethod def is_ipv6(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 6 class Daemon(object): SELECT_TIMEOUT = 1000 def __init__(self): self.db_connectors = {} self.selector = swsscommon.Select() self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> [] self.subscribers = set() def add_manager(self, db, table_name, callback): if db not in self.db_connectors: self.db_connectors[db] = swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) if table_name not in self.callbacks[db]: conn = self.db_connectors[db] subscriber = swsscommon.SubscriberStateTable(conn, table_name) self.subscribers.add(subscriber) self.selector.addSelectable(subscriber) self.callbacks[db][table_name].append(callback) def run(self): while g_run: state, _ = self.selector.select(Daemon.SELECT_TIMEOUT) if state == self.selector.TIMEOUT: continue elif state == self.selector.ERROR: raise Exception("Received error from select") for subscriber in self.subscribers: key, op, fvs = subscriber.pop() if not key: continue if g_debug: syslog.syslog(syslog.LOG_DEBUG, "Received message : {}".format((key, op, fvs))) for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]: callback(key, op, dict(fvs)) class Directory(object): def __init__(self): self.data = defaultdict(dict) self.notify = defaultdict(lambda: defaultdict(list)) def path_traverse(self, slot, path): if slot not in self.data: return False, None elif path == '': return True, self.data[slot] d = self.data[slot] for p in path.split("/"): if p not in d: return False, None d = d[p] return True, d def path_exist(self, slot, path): return self.path_traverse(slot, path)[0] def get_path(self, slot, path): return self.path_traverse(slot, path)[1] def put(self, slot, key, value): self.data[slot][key] = value if slot in self.notify: for path in self.notify[slot].keys(): if self.path_exist(slot, path): for handler in self.notify[slot][path]: handler() def get(self, slot, key): return self.data[slot][key] def remove(self, slot, key): if slot in self.data: if key in self.data[slot]: del self.data[slot][key] else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The key doesn't exist" % (key, slot)) else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove key '%s' from slot '%s'. The slot doesn't exist" % (key, slot)) def remove_slot(self, slot, key): if slot in self.data: del self.data[slot] else: syslog.syslog(syslog.LOG_ERR, "Directory: Can't remove slot '%s'. The slot doesn't exist" % slot) def get_slot(self, slot): return self.data[slot] def available_slot(self, slot): return slot in self.data def available_deps(self, deps): res = True for slot, path in deps: res = res and self.path_exist(slot, path) return res def subscribe(self, deps, handler): for slot, path in deps: self.notify[slot][path].append(handler) class Manager(object): def __init__(self, daemon, directory, deps, database, table_name): self.directory = directory self.deps = deps self.set_queue = [] daemon.add_manager(database, table_name, self.handler) directory.subscribe(deps, self.on_deps_change) def handler(self, key, op, data): if op == swsscommon.SET_COMMAND: if self.directory.available_deps(self.deps): res = self.set_handler(key, data) if not res: self.set_queue.append((key, data)) else: self.set_queue.append((key, data)) elif op == swsscommon.DEL_COMMAND: self.del_handler(key) else: syslog.syslog(syslog.LOG_ERR, 'Invalid operation "%s" for key "%s"' % (op, key)) def on_deps_change(self): new_queue = [] for key, data in self.set_queue: res = self.set_handler(key, data) if not res: new_queue.append((key, data)) self.set_queue = new_queue def set_handler(self, key, data): syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) def del_handler(self, key): syslog.syslog(syslog.LOG_ERR, "%s wasn't implemented for %s" % (self.__name__, self.__class__)) class BGPDeviceMetaMgr(Manager): def __init__(self, daemon, directory): super(BGPDeviceMetaMgr, self).__init__( daemon, directory, [], swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_METADATA_TABLE_NAME ) def set_handler(self, key, data): if key != "localhost" or "bgp_asn" not in data: return if self.directory.path_exist("meta", "localhost/bgp_asn"): bgp_asn = self.directory.get_path("meta", "localhost/bgp_asn") if bgp_asn == data["bgp_asn"]: return self.directory.put("meta", key, data) return True def del_handler(self, key): self.directory.remove("meta", key) class BGPNeighborMetaMgr(Manager): def __init__(self, daemon, directory): super(BGPNeighborMetaMgr, self).__init__( daemon, directory, [], swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME ) def set_handler(self, key, data): self.directory.put("neigmeta", key, data) return True def del_handler(self, key): self.directory.remove("neigmeta", key) class BGPPeerMgr(Manager): def __init__(self, daemon, directory): super(BGPPeerMgr, self).__init__( daemon, directory, [ ("meta", "localhost/bgp_asn"), ("neigmeta", ""), ], swsscommon.CONFIG_DB, swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME ) self.peers = self.load_peers() fabric = TemplateFabric() self.templates = { "add": fabric.from_file('bgpd.peer.conf.j2'), "delete": fabric.from_string('no neighbor {{ neighbor_addr }}'), "shutdown": fabric.from_string('neighbor {{ neighbor_addr }} shutdown'), "no shutdown": fabric.from_string('no neighbor {{ neighbor_addr }} shutdown'), } def set_handler(self, key, data): key = self.normalize_key(key) vrf, nbr = key.split('|', 1) if key not in self.peers: cmd = None neigmeta = self.directory.get_slot("neigmeta") if 'name' in data and data["name"] not in neigmeta: return False try: cmd = self.templates["add"].render( DEVICE_METADATA=self.directory.get_slot("meta"), DEVICE_NEIGHBOR_METADATA=neigmeta, neighbor_addr=nbr, bgp_session=data ) except: syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data)) return True if cmd is not None: rc = self.apply_op(cmd, vrf) if rc: self.peers.add(key) syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} wasn't added.".format(key)) else: # when the peer is already configured we support "shutdown/no shutdown" # commands for the peers only if "admin_status" in data: if data['admin_status'] == 'up': rc = self.apply_op(self.templates["no shutdown"].render(neighbor_addr=nbr), vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'up'.".format(key)) elif data['admin_status'] == 'down': rc = self.apply_op(self.templates["shutdown"].render(neighbor_addr=nbr), vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {} admin state wasn't set to 'down'.".format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status'])) else: syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key)) return True def del_handler(self, key): key = self.normalize_key(key) vrf, nbr = key.split('|', 1) if key not in self.peers: syslog.syslog(syslog.LOG_WARNING, 'Peer {} has not been found'.format(key)) return cmd = self.templates["delete"].render(neighbor_addr=nbr) rc = self.apply_op(cmd, vrf) if rc: syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key)) self.peers.remove(key) else: syslog.syslog(syslog.LOG_ERR, "Peer {} hasn't been removed".format(key)) def apply_op(self, cmd, vrf): bgp_asn = self.directory.get_slot("meta")["localhost"]["bgp_asn"] fd, tmp_filename = tempfile.mkstemp(dir='/tmp') os.close(fd) with open(tmp_filename, 'w') as fp: if vrf == 'default': fp.write('router bgp %s\n' % bgp_asn) else: fp.write('router bgp %s vrf %s\n' % (bgp_asn, vrf)) fp.write("%s\n" % cmd) command = ["vtysh", "-f", tmp_filename] rc, _, _ = run_command(command) os.remove(tmp_filename) return rc == 0 @staticmethod def normalize_key(key): if '|' not in key: return 'default|' + key else: return key @staticmethod def load_peers(): vrfs = [] command = ["vtysh", "-c", "show bgp vrfs json"] rc, out, err = run_command(command) if rc == 0: js_vrf = json.loads(out) vrfs = js_vrf['vrfs'].keys() peers = set() for vrf in vrfs: command = ["vtysh", "-c", 'show bgp vrf {} neighbors json'.format(vrf)] rc, out, err = run_command(command) if rc == 0: js_bgp = json.loads(out) for nbr in js_bgp.keys(): peers.add((vrf, nbr)) return peers def wait_for_bgpd(): # wait for 20 seconds stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20) syslog.syslog(syslog.LOG_INFO, "Start waiting for bgpd: %s" % str(datetime.datetime.now())) while datetime.datetime.now() < stop_time: rc, out, err = run_command(["vtysh", "-c", "show daemons"]) if rc == 0 and "bgpd" in out: syslog.syslog(syslog.LOG_INFO, "bgpd connected to vtysh: %s" % str(datetime.datetime.now())) return time.sleep(0.1) # sleep 100 ms raise RuntimeError("bgpd hasn't been started in 20 seconds") def main(): managers = [ BGPDeviceMetaMgr, BGPNeighborMetaMgr, BGPPeerMgr, ] wait_for_bgpd() daemon = Daemon() directory = Directory() manager_instanses = [ manager(daemon, directory) for manager in managers ] daemon.run() def signal_handler(signum, frame): global g_run g_run = False if __name__ == '__main__': rc = 0 try: syslog.openlog('bgpcfgd') signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) main() except KeyboardInterrupt: syslog.syslog(syslog.LOG_NOTICE, "Keyboard interrupt") except RuntimeError as e: syslog.syslog(syslog.LOG_CRIT, "%s" % str(e)) rc = -2 except Exception as e: syslog.syslog(syslog.LOG_CRIT, "Got an exception %s: Traceback: %s" % (str(e), traceback.format_exc())) rc = -1 finally: syslog.closelog() try: sys.exit(rc) except SystemExit: os._exit(rc)