diff --git a/dockers/docker-orchagent/Dockerfile.j2 b/dockers/docker-orchagent/Dockerfile.j2 index 2a927b5f2b..5ad150a2bd 100755 --- a/dockers/docker-orchagent/Dockerfile.j2 +++ b/dockers/docker-orchagent/Dockerfile.j2 @@ -68,7 +68,7 @@ RUN apt-get purge -y \ COPY ["files/arp_update", "/usr/bin"] COPY ["arp_update.conf", "files/arp_update_vars.j2", "/usr/share/sonic/templates/"] COPY ["ndppd.conf", "/usr/share/sonic/templates/"] -COPY ["enable_counters.py", "/usr/bin"] +COPY ["enable_counters.py", "tunnel_packet_handler.py", "/usr/bin/"] COPY ["docker-init.sh", "orchagent.sh", "swssconfig.sh", "buffermgrd.sh", "/usr/bin/"] COPY ["files/supervisor-proc-exit-listener", "/usr/bin"] diff --git a/dockers/docker-orchagent/supervisord.conf.j2 b/dockers/docker-orchagent/supervisord.conf.j2 index 0ba9b8f480..51ead0db7c 100644 --- a/dockers/docker-orchagent/supervisord.conf.j2 +++ b/dockers/docker-orchagent/supervisord.conf.j2 @@ -242,3 +242,15 @@ stderr_logfile=syslog dependent_startup=true dependent_startup_wait_for=swssconfig:exited {%- endif %} + +{% if is_fabric_asic == 0 %} +[program:tunnel_packet_handler] +command=/usr/bin/tunnel_packet_handler.py +priority=12 +autostart=false +autorestart=unexpected +stdout_logfile=syslog +stderr_logfile=syslog +dependent_startup=true +dependent_startup_wait_for=swssconfig:exited +{%- endif %} diff --git a/dockers/docker-orchagent/tunnel_packet_handler.py b/dockers/docker-orchagent/tunnel_packet_handler.py new file mode 100755 index 0000000000..7d9f7f5612 --- /dev/null +++ b/dockers/docker-orchagent/tunnel_packet_handler.py @@ -0,0 +1,326 @@ +#! /usr/bin/env python3 +""" +Adds neighbor to kernel for undeliverable tunnel packets + +When receiving tunnel packets, if the hardware doesn't contain neighbor +information for the inner packet's destination IP, the entire encapsulated +packet is trapped to the CPU. In this case, we should ping the inner +destination IP to trigger the process of obtaining neighbor information +""" +import subprocess +import time + +from datetime import datetime +from ipaddress import ip_interface +from pyroute2 import IPRoute +from scapy.layers.inet import IP +from scapy.layers.inet6 import IPv6 +from scapy.sendrecv import AsyncSniffer +from swsssdk import ConfigDBConnector, SonicV2Connector +from sonic_py_common import logger as log + +logger = log.Logger() + +STATE_DB = 'STATE_DB' +PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE' +TUNNEL_TABLE = 'TUNNEL' +PEER_SWITCH_TABLE = 'PEER_SWITCH' +INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}' +STATE_KEY = 'state' +TUNNEL_TYPE_KEY = 'tunnel_type' +DST_IP_KEY = 'dst_ip' +ADDRESS_IPV4_KEY = 'address_ipv4' +IPINIP_TUNNEL = 'ipinip' + +RTM_NEWLINK = 'RTM_NEWLINK' + + +class TunnelPacketHandler(object): + + def __init__(self): + self.config_db = ConfigDBConnector() + self.config_db.connect() + self.state_db = SonicV2Connector() + self.state_db.connect(STATE_DB) + self._portchannel_intfs = None + self.up_portchannels = None + self.netlink_api = IPRoute() + + @property + def portchannel_intfs(self): + """ + Gets all portchannel interfaces and IPv4 addresses in config DB + + Returns: + (list) Tuples of a portchannel interface name (str) and + associated IPv4 address (str) + """ + if self._portchannel_intfs is None: + intf_keys = self.config_db.get_keys(PORTCHANNEL_INTERFACE_TABLE) + portchannel_intfs = [] + + for key in intf_keys: + if isinstance(key, tuple) and len(key) > 1: + if ip_interface(key[1]).version == 4: + portchannel_intfs.append(key) + + self._portchannel_intfs = portchannel_intfs + + return self._portchannel_intfs + + def get_portchannel_index_mapping(self): + """ + Gets a mapping of interface kernel indices to portchannel interfaces + + Returns: + (list) integers representing kernel indices + """ + index_map = {} + for portchannel in self.portchannel_intfs: + index = self.netlink_api.link_lookup(ifname=portchannel[0])[0] + index_map[index] = portchannel + + return index_map + + def get_up_portchannels(self): + """ + Returns the portchannels which are operationally up + + Returns: + (list) of interface names which are up, as strings + """ + pc_index_map = self.get_portchannel_index_mapping() + pc_indices = list(pc_index_map.keys()) + link_statuses = self.netlink_api.get_links(*pc_indices) + up_portchannels = [] + + for status in link_statuses: + if status['state'] == 'up': + port_index = status['index'] + up_portchannels.append(pc_index_map[port_index][0]) + + return up_portchannels + + def all_portchannels_established(self): + """ + Checks if the portchannel interfaces are established + + Note that this status does not indicate operational state + Returns: + (bool) True, if all interfaces are established + False, otherwise + """ + intfs = self.portchannel_intfs + for intf in intfs: + intf_table_name = INTF_TABLE_TEMPLATE.format(intf[0], intf[1]) + intf_state = self.state_db.get( + STATE_DB, + intf_table_name, + STATE_KEY + ) + + if intf_state and intf_state.lower() != 'ok': + return False + + return True + + def wait_for_portchannels(self, interval=5, timeout=60): + """ + Continuosly checks if all portchannel host interfaces are established + + Args: + interval: the interval (in seconds) at which to perform the check + timeout: maximum allowed duration (in seconds) to wait for + interfaces to come up + + Raises: + RuntimeError if the timeout duration is reached and interfaces are + still not up + """ + start = datetime.now() + + while (datetime.now() - start).seconds < timeout: + if self.all_portchannels_established(): + logger.log_info("All portchannel intfs are established") + return None + logger.log_info("Not all portchannel intfs are established") + time.sleep(interval) + + raise RuntimeError('Portchannel intfs were not established ' + 'within {}'.format(timeout)) + + def get_ipinip_tunnel_addrs(self): + """ + Get the IP addresses used for the IPinIP tunnel + + These should be the Loopback0 addresses for this device and the + peer device + + Returns: + ((str) self_loopback_ip, (str) peer_loopback_ip) + or + (None, None) If the tunnel type is not IPinIP + or + if an error is encountered. This most likely means + the host device is not a dual ToR device + """ + try: + peer_switch = self.config_db.get_keys(PEER_SWITCH_TABLE)[0] + tunnel = self.config_db.get_keys(TUNNEL_TABLE)[0] + except IndexError: + logger.log_warning('PEER_SWITCH or TUNNEL table' + 'not found in config DB') + return None, None + + try: + tunnel_table = self.config_db.get_entry(TUNNEL_TABLE, tunnel) + tunnel_type = tunnel_table[TUNNEL_TYPE_KEY].lower() + self_loopback_ip = tunnel_table[DST_IP_KEY] + peer_loopback_ip = self.config_db.get_entry( + PEER_SWITCH_TABLE, peer_switch + )[ADDRESS_IPV4_KEY] + except KeyError as e: + logger.log_warning( + 'PEER_SWITCH or TUNNEL table missing data, ' + 'could not find key {}' + .format(e) + ) + return None, None + + if tunnel_type == IPINIP_TUNNEL: + return self_loopback_ip, peer_loopback_ip + + return None, None + + def get_inner_pkt_type(self, packet): + """ + Get the type of an inner encapsulated packet + + Returns: + (str) 'v4' if the inner packet is IPv4 + (str) 'v6' if the inner packet is IPv6 + (bool) False if `packet` is not an IPinIP packet + """ + if packet.haslayer(IP): + # Determine inner packet type based on IP protocol number + # The outer packet type should always be IPv4 + if packet[IP].proto == 4: + return IP + elif packet[IP].proto == 41: + return IPv6 + return False + + def wait_for_netlink_msgs(self): + """ + Gathers any RTM_NEWLINK messages + + Returns: + (list) containing any received messages + """ + msgs = [] + with IPRoute() as ipr: + ipr.bind() + for msg in ipr.get(): + if msg['event'] == RTM_NEWLINK: + msgs.append(msg) + + return msgs + + def sniffer_restart_required(self, messages): + """ + Determines if the packet sniffer needs to be restarted + + A restart is required if all of the following conditions are met: + 1. A netlink message of type RTM_NEWLINK is received + (this is checked by `wait_for_netlink_msgs`) + 2. The interface index of the message corresponds to a portchannel + interface + 3. The state of the interface in the message is 'up' + Here, we do not care about an interface going down since + the sniffer is able to continue sniffing on the other + interfaces. However, if an interface has gone down and + come back up, we need to restart the sniffer to be able + to sniff traffic on the interface that has come back up. + """ + pc_index_map = self.get_portchannel_index_mapping() + for msg in messages: + if msg['index'] in pc_index_map: + if msg['state'] == 'up': + logger.log_info('{} came back up, sniffer restart required' + .format(pc_index_map[msg['index']])) + return True + return False + + def listen_for_tunnel_pkts(self): + """ + Listens for tunnel packets that are trapped to CPU + + These packets may be trapped if there is no neighbor info for the + inner packet destination IP in the hardware. + """ + + def _ping_inner_dst(packet): + """ + Pings the inner destination IP for an encapsulated packet + + Args: + packet: The encapsulated packet received + """ + inner_packet_type = self.get_inner_pkt_type(packet) + if inner_packet_type and packet[IP].dst == self_ip: + cmds = ['timeout', '0.2', 'ping', '-c1', + '-W1', '-i0', '-n', '-q'] + if inner_packet_type == IPv6: + cmds.append('-6') + dst_ip = packet[IP].payload[inner_packet_type].dst + cmds.append(dst_ip) + logger.log_info("Running command '{}'".format(' '.join(cmds))) + subprocess.run(cmds, stdout=subprocess.DEVNULL) + + self_ip, peer_ip = self.get_ipinip_tunnel_addrs() + if self_ip is None or peer_ip is None: + logger.log_notice('Could not get tunnel addresses from ' + 'config DB, exiting...') + return None + + packet_filter = 'host {} and host {}'.format(self_ip, peer_ip) + logger.log_notice('Starting tunnel packet handler for {}' + .format(packet_filter)) + + sniff_intfs = self.get_up_portchannels() + logger.log_info("Listening on interfaces {}".format(sniff_intfs)) + + sniffer = AsyncSniffer( + iface=sniff_intfs, + filter=packet_filter, + prn=_ping_inner_dst + + ) + sniffer.start() + while True: + msgs = self.wait_for_netlink_msgs() + if self.sniffer_restart_required(msgs): + sniffer.stop() + sniff_intfs = self.get_up_portchannels() + logger.log_notice('Restarting tunnel packet handler on ' + 'interfaces {}'.format(sniff_intfs)) + sniffer = AsyncSniffer( + iface=sniff_intfs, + filter=packet_filter, + prn=_ping_inner_dst + ) + sniffer.start() + + def run(self): + self.wait_for_portchannels() + self.listen_for_tunnel_pkts() + + +def main(): + logger.set_min_log_priority_info() + handler = TunnelPacketHandler() + handler.run() + + +if __name__ == "__main__": + main()