#!/usr/bin/env python import sys import subprocess import datetime import time import syslog import signal import traceback import os import shutil import tempfile import json from collections import defaultdict from pprint import pprint import jinja2 import netaddr from swsscommon import swsscommon g_run = True g_debug = False def run_command(command, shell=False): str_cmd = " ".join(command) if g_debug: syslog.syslog(syslog.LOG_DEBUG, "execute command {}.".format(str_cmd)) p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode != 0: syslog.syslog(syslog.LOG_ERR, 'command execution returned {}. Command: "{}", stdout: "{}", stderr: "{}"'.format(p.returncode, str_cmd, stdout, stderr)) return p.returncode, stdout, stderr class TemplateFabric(object): def __init__(self): j2_template_paths = ['/usr/share/sonic/templates'] j2_loader = jinja2.FileSystemLoader(j2_template_paths) j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=True) j2_env.filters['ipv4'] = self.is_ipv4 j2_env.filters['ipv6'] = self.is_ipv6 self.env = j2_env def from_file(self, filename): return self.env.get_template(filename) def from_string(self, tmpl): return self.env.from_string(tmpl) @staticmethod def is_ipv4(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 4 @staticmethod def is_ipv6(value): if not value: return False if isinstance(value, netaddr.IPNetwork): addr = value else: try: addr = netaddr.IPNetwork(str(value)) except: return False return addr.version == 6 class BGPConfigManager(object): def __init__(self, daemon): self.bgp_asn = None self.meta = None self.bgp_messages = [] self.peers = self.load_peers() # we can have bgp monitors peers here. it could be fixed by adding support for it here fabric = TemplateFabric() self.bgp_peer_add_template = fabric.from_file('bgpd.peer.conf.j2') self.bgp_peer_del_template = fabric.from_string('no neighbor {{ neighbor_addr }}') self.bgp_peer_shutdown = fabric.from_string('neighbor {{ neighbor_addr }} shutdown') self.bgp_peer_no_shutdown = fabric.from_string('no neighbor {{ neighbor_addr }} shutdown') daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, self.__metadata_handler) daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, self.__bgp_handler) def load_peers(self): peers = set() command = ["vtysh", "-c", "show bgp neighbors json"] rc, out, err = run_command(command) if rc == 0: js_bgp = json.loads(out) peers = set(js_bgp.keys()) return peers def __metadata_handler(self, key, op, data): if key != "localhost" \ or "bgp_asn" not in data \ or self.bgp_asn == data["bgp_asn"]: return # TODO add ASN update commands self.meta = { 'localhost': data } self.bgp_asn = data["bgp_asn"] self.__update_bgp() def __update_bgp(self): cmds = [] for key, op, data in self.bgp_messages: if op == swsscommon.SET_COMMAND: if key not in self.peers: cmds.append(self.bgp_peer_add_template.render(DEVICE_METADATA=self.meta, neighbor_addr=key, bgp_session=data)) syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data)) self.peers.add(key) else: # when the peer is already configured we support "shutdown/no shutdown" # commands for the peers only if "admin_status" in data: if data['admin_status'] == 'up': cmds.append(self.bgp_peer_no_shutdown.render(neighbor_addr=key)) syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key)) elif data['admin_status'] == 'down': cmds.append(self.bgp_peer_shutdown.render(neighbor_addr=key)) syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key)) else: syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status'])) else: syslog.syslog(syslog.LOG_INFO, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key)) elif op == swsscommon.DEL_COMMAND: if key in self.peers: cmds.append(self.bgp_peer_del_template.render(neighbor_addr=key)) syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key)) self.peers.remove(key) else: syslog.syslog(syslog.LOG_WARNING, 'Peer {} is not found'.format(key)) self.bgp_messages = [] if len(cmds) == 0: return fd, tmp_filename = tempfile.mkstemp(dir='/tmp') os.close(fd) with open (tmp_filename, 'w') as fp: fp.write('router bgp %s\n' % self.bgp_asn) for cmd in cmds: fp.write("%s\n" % cmd) command = ["vtysh", "-f", tmp_filename] run_command(command) #FIXME os.remove(tmp_filename) def __bgp_handler(self, key, op, data): self.bgp_messages.append((key, op, data)) # If ASN is not set, we just cache this message until the ASN is set. if self.bgp_asn is not None: self.__update_bgp() class Daemon(object): SELECT_TIMEOUT = 1000 DATABASE_LIST = [ swsscommon.CONFIG_DB ] def __init__(self): self.db_connectors = { db : swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) for db in Daemon.DATABASE_LIST } self.selector = swsscommon.Select() self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> [] self.subscribers = set() def add_manager(self, db, table_name, callback): if db not in Daemon.DATABASE_LIST: raise ValueError("database {} isn't supported. Supported '{}' only.".format(db, ",".join(Daemon.DATABASE_LIST))) if table_name not in self.callbacks[db]: conn = self.db_connectors[db] subscriber = swsscommon.SubscriberStateTable(conn, table_name) self.subscribers.add(subscriber) self.selector.addSelectable(subscriber) self.callbacks[db][table_name].append(callback) def run(self): while g_run: state, _ = self.selector.select(Daemon.SELECT_TIMEOUT) if state == self.selector.TIMEOUT: continue elif state == self.selector.ERROR: raise Exception("Received error from select") for subscriber in self.subscribers: key, op, fvs = subscriber.pop() if not key: continue if g_debug: syslog.syslog(syslog.LOG_DEBUG, "Received message : {}".format((key, op, fvs))) for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]: callback(key, op, dict(fvs)) def wait_for_bgpd(): # wait for 20 seconds stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20) syslog.syslog(syslog.LOG_INFO, "Start waiting for bgpd: %s" % str(datetime.datetime.now())) while datetime.datetime.now() < stop_time: rc, out, err = run_command(["vtysh", "-c", "show daemons"]) if rc == 0 and "bgpd" in out: syslog.syslog(syslog.LOG_INFO, "bgpd connected to vtysh: %s" % str(datetime.datetime.now())) return time.sleep(0.1) # sleep 100 ms raise RuntimeError("bgpd hasn't been started in 20 seconds") def main(): wait_for_bgpd() daemon = Daemon() bgp_manager = BGPConfigManager(daemon) daemon.run() def signal_handler(signum, frame): global g_run g_run = False if __name__ == '__main__': rc = 0 try: syslog.openlog('bgpcfgd') signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) main() except KeyboardInterrupt: syslog.syslog(syslog.LOG_NOTICE, "Keyboard interrupt") except RuntimeError as e: syslog.syslog(syslog.LOG_CRIT, "%s" % str(e)) rc = -2 except Exception as e: syslog.syslog(syslog.LOG_CRIT, "Got an exception %s: Traceback: %s" % (str(e), traceback.format_exc())) rc = -1 finally: syslog.closelog() try: sys.exit(rc) except SystemExit: os._exit(rc)