sonic-buildimage/dockers/docker-fpm-quagga/bgpcfgd

348 lines
14 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python
import sys
import subprocess
import datetime
import time
import syslog
import signal
import traceback
import os
import shutil
from collections import defaultdict
from pprint import pprint
import jinja2
import netaddr
from swsscommon import swsscommon
g_run = True
g_debug = False
def run_command(command, shell=False):
str_cmd = " ".join(command)
if g_debug:
syslog.syslog(syslog.LOG_DEBUG, "execute command {}.".format(str_cmd))
p = subprocess.Popen(command, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
syslog.syslog(syslog.LOG_ERR, 'command execution returned {}. Command: "{}", stdout: "{}", stderr: "{}"'.format(p.returncode, str_cmd, stdout, stderr))
return p.returncode, stdout, stderr
class TemplateFabric(object):
def __init__(self):
j2_template_paths = ['/usr/share/sonic/templates']
j2_loader = jinja2.FileSystemLoader(j2_template_paths)
j2_env = jinja2.Environment(loader=j2_loader, trim_blocks=True)
j2_env.filters['ipv4'] = self.is_ipv4
j2_env.filters['ipv6'] = self.is_ipv6
self.env = j2_env
def from_file(self, filename):
return self.env.get_template(filename)
def from_string(self, tmpl):
return self.env.from_string(tmpl)
@staticmethod
def is_ipv4(value):
if not value:
return False
if isinstance(value, netaddr.IPNetwork):
addr = value
else:
try:
addr = netaddr.IPNetwork(str(value))
except:
return False
return addr.version == 4
@staticmethod
def is_ipv6(value):
if not value:
return False
if isinstance(value, netaddr.IPNetwork):
addr = value
else:
try:
addr = netaddr.IPNetwork(str(value))
except:
return False
return addr.version == 6
class BGPConfigManager(object):
def __init__(self, daemon):
self.bgp_asn = None
self.meta = None
self.lo_ipv4 = None
self.lo_ipv6 = None
self.neig_meta = {}
self.bgp_messages = []
self.peers = self.load_peers() # we can have bgp monitors peers here. it could be fixed by adding support for it here
fabric = TemplateFabric()
self.bgp_peer_add_template = fabric.from_file('bgpd.peer.conf.j2')
self.bgp_peer_del_template = fabric.from_string('no neighbor {{ neighbor_addr }}')
self.bgp_peer_shutdown = fabric.from_string('neighbor {{ neighbor_addr }} shutdown')
self.bgp_peer_no_shutdown = fabric.from_string('no neighbor {{ neighbor_addr }} shutdown')
self.zebra_set_src_template = fabric.from_file('zebra.set_src.conf.j2')
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_METADATA_TABLE_NAME, self.__metadata_handler)
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_DEVICE_NEIGHBOR_METADATA_TABLE_NAME, self.__neighbor_metadata_handler)
daemon.add_manager(swsscommon.CONFIG_DB, swsscommon.CFG_BGP_NEIGHBOR_TABLE_NAME, self.__bgp_handler)
daemon.add_manager(swsscommon.STATE_DB, swsscommon.STATE_INTERFACE_TABLE_NAME, self.__if_handler)
def load_peers(self):
peers = set()
for ip_type in ["ip", "ipv6"]:
command = ["vtysh", "-c", "show %s bgp summary" % ip_type]
rc, out, err = run_command(command)
if rc == 0:
inside = False
for line in out.split("\n"):
if "Neighbor V" in line:
inside = True
elif "Total number of neighbors" in line:
break
elif inside:
if line.startswith("*"):
continue
space = line.find(" ")
if space == -1:
peers.add(line)
else:
peers.add(line[:space])
return peers
def __metadata_handler(self, key, op, data):
if key != "localhost" \
or "bgp_asn" not in data \
or self.bgp_asn == data["bgp_asn"]:
return
# TODO add ASN update commands
self.meta = { 'localhost': data }
self.bgp_asn = data["bgp_asn"]
self.__update_bgp()
def __neighbor_metadata_handler(self, key, op, data):
if op == swsscommon.SET_COMMAND:
self.neig_meta[key] = data
elif op == swsscommon.DEL_COMMAND:
if key in self.neig_meta:
del self.neig_meta[key]
else:
syslog.syslog(syslog.LOG_ERR,"Can't remove key '%s' from neighbor metadata handler. The key doesn't exist" % key)
else:
syslog.syslog(syslog.LOG_ERR,"Wrong operation '%s' for neighbor metadata handler" % op)
self.__update_bgp()
def __if_handler(self, key, op, data):
cmds = []
if op == swsscommon.SET_COMMAND and key.startswith("Loopback0|") and "state" in data and data["state"] == "ok":
ip_addr_w_mask = key.replace("Loopback0|", "")
slash_pos = ip_addr_w_mask.rfind("/")
if slash_pos == -1:
syslog.syslog(syslog.LOG_ERR, "Wrong Loopback0 ip prefix '%s'" % ip_addr_w_mask)
return
ip_addr = ip_addr_w_mask[:slash_pos]
try:
if TemplateFabric.is_ipv4(ip_addr):
if self.lo_ipv4 is None:
self.lo_ipv4 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC", lo_ip=ip_addr, ip_proto="")
else:
syslog.syslog(syslog.LOG_INFO, "Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv4, ip_addr))
return
elif TemplateFabric.is_ipv6(ip_addr):
if self.lo_ipv6 is None:
self.lo_ipv6 = ip_addr
txt = self.zebra_set_src_template.render(rm_name="RM_SET_SRC6", lo_ip=ip_addr, ip_proto="v6")
else:
syslog.syslog(syslog.LOG_INFO, "Update command is not supported for set src templates. current ip='%s'. new ip='%s'" % (self.lo_ipv6, ip_addr))
return
else:
syslog.syslog(syslog.LOG_ERR, "Got ambiguous ip address '%s'" % ip_addr)
return
except:
syslog.syslog(syslog.LOG_ERR, "Error while rendering set src template '%s'" % ip_addr)
else:
cmds.append(txt)
syslog.syslog(syslog.LOG_INFO, "Generate set src configuration with Loopback0 ipv4 '%s'" % ip_addr)
elif op == swsscommon.DEL_COMMAND:
syslog.syslog(syslog.LOG_INFO, "Delete command is not supported for set src templates")
for cmd in cmds:
self.__apply_cmd(cmd, zebra=True)
def __update_bgp(self):
cmds = []
new_bgp_messages = []
for key, op, data in self.bgp_messages:
if op == swsscommon.SET_COMMAND:
if key not in self.peers:
if 'name' in data and data['name'] not in self.neig_meta:
# DEVICE_NEIGHBOR_METADATA should be populated before the rendering
new_bgp_messages.append((key, op, data))
continue
try:
txt = self.bgp_peer_add_template.render(DEVICE_METADATA=self.meta, DEVICE_NEIGHBOR_METADATA=self.neig_meta, neighbor_addr=key, bgp_session=data)
cmds.append(txt)
except:
syslog.syslog(syslog.LOG_ERR, 'Peer {}. Error in rendering the template for "SET" command {}'.format(key, data))
else:
syslog.syslog(syslog.LOG_INFO, 'Peer {} added with attributes {}'.format(key, data))
self.peers.add(key)
else:
# when the peer is already configured we support "shutdown/no shutdown"
# commands for the peers only
if "admin_status" in data:
if data['admin_status'] == 'up':
cmds.append(self.bgp_peer_no_shutdown.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "up"'.format(key))
elif data['admin_status'] == 'down':
cmds.append(self.bgp_peer_shutdown.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} admin state is set to "down"'.format(key))
else:
syslog.syslog(syslog.LOG_ERR, "Peer {}: Can't update the peer. has wrong attribute value attr['admin_status'] = '{}'".format(key, data['admin_status']))
else:
syslog.syslog(syslog.LOG_INFO, "Peer {}: Can't update the peer. No 'admin_status' attribute in the request".format(key))
elif op == swsscommon.DEL_COMMAND:
if key in self.peers:
cmds.append(self.bgp_peer_del_template.render(neighbor_addr=key))
syslog.syslog(syslog.LOG_INFO, 'Peer {} has been removed'.format(key))
self.peers.remove(key)
else:
syslog.syslog(syslog.LOG_WARNING, 'Peer {} is not found'.format(key))
self.bgp_messages = new_bgp_messages
for cmd in cmds:
self.__apply_cmd(cmd)
def __apply_cmd(self, cmd, zebra=False):
lines = [line for line in cmd.split("\n") if not line.startswith('!') and line.strip() != ""]
if len(lines) == 0:
return
offset = len(lines[0]) - len(lines[0].lstrip())
chunks = ["dummy"]
for line in cmd.split("\n"):
c_offset = len(line) - len(line.lstrip())
if c_offset > offset:
chunks.append(line.strip())
elif c_offset < offset:
chunks.pop()
chunks.pop()
chunks.append(line.strip())
else:
chunks.pop()
chunks.append(line.strip())
command = ["vtysh", "-c", "conf t"]
if not zebra:
command.append("-c")
command.append("router bgp %s" % self.bgp_asn)
for chunk in chunks:
command.append("-c")
command.append(chunk)
run_command(command)
def __bgp_handler(self, key, op, data):
self.bgp_messages.append((key, op, data))
# If ASN is not set, we just cache this message until the ASN is set.
if self.bgp_asn is not None:
self.__update_bgp()
class Daemon(object):
SELECT_TIMEOUT = 1000
DATABASE_LIST = [ swsscommon.CONFIG_DB, swsscommon.STATE_DB ]
def __init__(self):
self.db_connectors = { db : swsscommon.DBConnector(db, swsscommon.DBConnector.DEFAULT_UNIXSOCKET, 0) for db in Daemon.DATABASE_LIST }
self.selector = swsscommon.Select()
self.callbacks = defaultdict(lambda : defaultdict(list)) # db -> table -> []
self.subscribers = set()
def add_manager(self, db, table_name, callback):
if db not in Daemon.DATABASE_LIST:
raise ValueError("database {} isn't supported. Supported '{}' only.".format(db, ",".join(Daemon.DATABASE_LIST)))
if table_name not in self.callbacks[db]:
conn = self.db_connectors[db]
subscriber = swsscommon.SubscriberStateTable(conn, table_name)
self.subscribers.add(subscriber)
self.selector.addSelectable(subscriber)
self.callbacks[db][table_name].append(callback)
def run(self):
while g_run:
state, _ = self.selector.select(Daemon.SELECT_TIMEOUT)
if state == self.selector.TIMEOUT:
continue
elif state == self.selector.ERROR:
raise Exception("Received error from select")
for subscriber in self.subscribers:
key, op, fvs = subscriber.pop()
if not key:
continue
if g_debug:
syslog.syslog(syslog.LOG_DEBUG, "Received message : {}".format((key, op, fvs)))
for callback in self.callbacks[subscriber.getDbConnector().getDbId()][subscriber.getTableName()]:
callback(key, op, dict(fvs))
def wait_for_bgpd():
# wait for 20 seconds
stop_time = datetime.datetime.now() + datetime.timedelta(seconds=20)
syslog.syslog(syslog.LOG_INFO, "Start waiting for bgpd: %s" % str(datetime.datetime.now()))
while datetime.datetime.now() < stop_time:
rc, out, err = run_command(["ps", "ax"])
if rc == 0 and "bgpd" in out:
for line in out.split("\n"):
if "/usr/lib/quagga/bgpd" in line:
time.sleep(0.01) # wait that bgpd connected to vtysh
syslog.syslog(syslog.LOG_INFO, "bgpd connected to vtysh: %s" % str(datetime.datetime.now()))
return
time.sleep(0.1) # sleep 100 ms
raise RuntimeError("bgpd hasn't been started in 20 seconds")
def main():
wait_for_bgpd()
daemon = Daemon()
bgp_manager = BGPConfigManager(daemon)
daemon.run()
def signal_handler(signum, frame):
global g_run
g_run = False
if __name__ == '__main__':
rc = 0
try:
syslog.openlog('bgpcfgd')
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
main()
except KeyboardInterrupt:
syslog.syslog(syslog.LOG_NOTICE, "Keyboard interrupt")
except RuntimeError as e:
syslog.syslog(syslog.LOG_CRIT, "%s" % str(e))
rc = -2
except Exception as e:
syslog.syslog(syslog.LOG_CRIT, "Got an exception %s: Traceback: %s" % (str(e), traceback.format_exc()))
rc = -1
finally:
syslog.closelog()
try:
sys.exit(rc)
except SystemExit:
os._exit(rc)