[swss]: Listen for undeliverable tunnel packets (#9348)
- Create a script in the orchagent docker container which listens for these encapsulated packets which are trapped to CPU (indicating that they cannot be routed/no neighbor info exists for the inner packet). When such a packet is received, the script will issue a ping command to the packet's inner destination IP to start the neighbor learning process. - This script is also resilient to portchannel status changes (i.e. interface going up or down). An interface going down does not affect traffic sniffing on interfaces which are still up. When an interface comes back up, we restart the sniffer to start capturing traffic on that interface again.
This commit is contained in:
parent
724037ebc3
commit
a41c15a329
@ -68,7 +68,7 @@ RUN apt-get purge -y \
|
||||
COPY ["files/arp_update", "/usr/bin"]
|
||||
COPY ["arp_update.conf", "files/arp_update_vars.j2", "/usr/share/sonic/templates/"]
|
||||
COPY ["ndppd.conf", "/usr/share/sonic/templates/"]
|
||||
COPY ["enable_counters.py", "/usr/bin"]
|
||||
COPY ["enable_counters.py", "tunnel_packet_handler.py", "/usr/bin/"]
|
||||
COPY ["docker-init.sh", "orchagent.sh", "swssconfig.sh", "buffermgrd.sh", "/usr/bin/"]
|
||||
COPY ["supervisord.conf", "/etc/supervisor/conf.d/"]
|
||||
COPY ["files/supervisor-proc-exit-listener", "/usr/bin"]
|
||||
|
@ -206,3 +206,12 @@ stderr_logfile=syslog
|
||||
dependent_startup=true
|
||||
dependent_startup_wait_for=swssconfig:exited
|
||||
|
||||
[program:tunnel_packet_handler]
|
||||
command=/usr/bin/tunnel_packet_handler.py
|
||||
priority=12
|
||||
autostart=false
|
||||
autorestart=unexpected
|
||||
stdout_logfile=syslog
|
||||
stderr_logfile=syslog
|
||||
dependent_startup=true
|
||||
dependent_startup_wait_for=swssconfig:exited
|
||||
|
326
dockers/docker-orchagent/tunnel_packet_handler.py
Executable file
326
dockers/docker-orchagent/tunnel_packet_handler.py
Executable file
@ -0,0 +1,326 @@
|
||||
#! /usr/bin/env python3
|
||||
"""
|
||||
Adds neighbor to kernel for undeliverable tunnel packets
|
||||
|
||||
When receiving tunnel packets, if the hardware doesn't contain neighbor
|
||||
information for the inner packet's destination IP, the entire encapsulated
|
||||
packet is trapped to the CPU. In this case, we should ping the inner
|
||||
destination IP to trigger the process of obtaining neighbor information
|
||||
"""
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
from datetime import datetime
|
||||
from ipaddress import ip_interface
|
||||
from pyroute2 import IPRoute
|
||||
from scapy.layers.inet import IP
|
||||
from scapy.layers.inet6 import IPv6
|
||||
from scapy.sendrecv import AsyncSniffer
|
||||
from swsssdk import ConfigDBConnector, SonicV2Connector
|
||||
from sonic_py_common import logger as log
|
||||
|
||||
logger = log.Logger()
|
||||
|
||||
STATE_DB = 'STATE_DB'
|
||||
PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE'
|
||||
TUNNEL_TABLE = 'TUNNEL'
|
||||
PEER_SWITCH_TABLE = 'PEER_SWITCH'
|
||||
INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}'
|
||||
STATE_KEY = 'state'
|
||||
TUNNEL_TYPE_KEY = 'tunnel_type'
|
||||
DST_IP_KEY = 'dst_ip'
|
||||
ADDRESS_IPV4_KEY = 'address_ipv4'
|
||||
IPINIP_TUNNEL = 'ipinip'
|
||||
|
||||
RTM_NEWLINK = 'RTM_NEWLINK'
|
||||
|
||||
|
||||
class TunnelPacketHandler(object):
|
||||
|
||||
def __init__(self):
|
||||
self.config_db = ConfigDBConnector()
|
||||
self.config_db.connect()
|
||||
self.state_db = SonicV2Connector()
|
||||
self.state_db.connect(STATE_DB)
|
||||
self._portchannel_intfs = None
|
||||
self.up_portchannels = None
|
||||
self.netlink_api = IPRoute()
|
||||
|
||||
@property
|
||||
def portchannel_intfs(self):
|
||||
"""
|
||||
Gets all portchannel interfaces and IPv4 addresses in config DB
|
||||
|
||||
Returns:
|
||||
(list) Tuples of a portchannel interface name (str) and
|
||||
associated IPv4 address (str)
|
||||
"""
|
||||
if self._portchannel_intfs is None:
|
||||
intf_keys = self.config_db.get_keys(PORTCHANNEL_INTERFACE_TABLE)
|
||||
portchannel_intfs = []
|
||||
|
||||
for key in intf_keys:
|
||||
if isinstance(key, tuple) and len(key) > 1:
|
||||
if ip_interface(key[1]).version == 4:
|
||||
portchannel_intfs.append(key)
|
||||
|
||||
self._portchannel_intfs = portchannel_intfs
|
||||
|
||||
return self._portchannel_intfs
|
||||
|
||||
def get_portchannel_index_mapping(self):
|
||||
"""
|
||||
Gets a mapping of interface kernel indices to portchannel interfaces
|
||||
|
||||
Returns:
|
||||
(list) integers representing kernel indices
|
||||
"""
|
||||
index_map = {}
|
||||
for portchannel in self.portchannel_intfs:
|
||||
index = self.netlink_api.link_lookup(ifname=portchannel[0])[0]
|
||||
index_map[index] = portchannel
|
||||
|
||||
return index_map
|
||||
|
||||
def get_up_portchannels(self):
|
||||
"""
|
||||
Returns the portchannels which are operationally up
|
||||
|
||||
Returns:
|
||||
(list) of interface names which are up, as strings
|
||||
"""
|
||||
pc_index_map = self.get_portchannel_index_mapping()
|
||||
pc_indices = list(pc_index_map.keys())
|
||||
link_statuses = self.netlink_api.get_links(*pc_indices)
|
||||
up_portchannels = []
|
||||
|
||||
for status in link_statuses:
|
||||
if status['state'] == 'up':
|
||||
port_index = status['index']
|
||||
up_portchannels.append(pc_index_map[port_index][0])
|
||||
|
||||
return up_portchannels
|
||||
|
||||
def all_portchannels_established(self):
|
||||
"""
|
||||
Checks if the portchannel interfaces are established
|
||||
|
||||
Note that this status does not indicate operational state
|
||||
Returns:
|
||||
(bool) True, if all interfaces are established
|
||||
False, otherwise
|
||||
"""
|
||||
intfs = self.portchannel_intfs
|
||||
for intf in intfs:
|
||||
intf_table_name = INTF_TABLE_TEMPLATE.format(intf[0], intf[1])
|
||||
intf_state = self.state_db.get(
|
||||
STATE_DB,
|
||||
intf_table_name,
|
||||
STATE_KEY
|
||||
)
|
||||
|
||||
if intf_state and intf_state.lower() != 'ok':
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def wait_for_portchannels(self, interval=5, timeout=60):
|
||||
"""
|
||||
Continuosly checks if all portchannel host interfaces are established
|
||||
|
||||
Args:
|
||||
interval: the interval (in seconds) at which to perform the check
|
||||
timeout: maximum allowed duration (in seconds) to wait for
|
||||
interfaces to come up
|
||||
|
||||
Raises:
|
||||
RuntimeError if the timeout duration is reached and interfaces are
|
||||
still not up
|
||||
"""
|
||||
start = datetime.now()
|
||||
|
||||
while (datetime.now() - start).seconds < timeout:
|
||||
if self.all_portchannels_established():
|
||||
logger.log_info("All portchannel intfs are established")
|
||||
return None
|
||||
logger.log_info("Not all portchannel intfs are established")
|
||||
time.sleep(interval)
|
||||
|
||||
raise RuntimeError('Portchannel intfs were not established '
|
||||
'within {}'.format(timeout))
|
||||
|
||||
def get_ipinip_tunnel_addrs(self):
|
||||
"""
|
||||
Get the IP addresses used for the IPinIP tunnel
|
||||
|
||||
These should be the Loopback0 addresses for this device and the
|
||||
peer device
|
||||
|
||||
Returns:
|
||||
((str) self_loopback_ip, (str) peer_loopback_ip)
|
||||
or
|
||||
(None, None) If the tunnel type is not IPinIP
|
||||
or
|
||||
if an error is encountered. This most likely means
|
||||
the host device is not a dual ToR device
|
||||
"""
|
||||
try:
|
||||
peer_switch = self.config_db.get_keys(PEER_SWITCH_TABLE)[0]
|
||||
tunnel = self.config_db.get_keys(TUNNEL_TABLE)[0]
|
||||
except IndexError:
|
||||
logger.log_warning('PEER_SWITCH or TUNNEL table'
|
||||
'not found in config DB')
|
||||
return None, None
|
||||
|
||||
try:
|
||||
tunnel_table = self.config_db.get_entry(TUNNEL_TABLE, tunnel)
|
||||
tunnel_type = tunnel_table[TUNNEL_TYPE_KEY].lower()
|
||||
self_loopback_ip = tunnel_table[DST_IP_KEY]
|
||||
peer_loopback_ip = self.config_db.get_entry(
|
||||
PEER_SWITCH_TABLE, peer_switch
|
||||
)[ADDRESS_IPV4_KEY]
|
||||
except KeyError as e:
|
||||
logger.log_warning(
|
||||
'PEER_SWITCH or TUNNEL table missing data, '
|
||||
'could not find key {}'
|
||||
.format(e)
|
||||
)
|
||||
return None, None
|
||||
|
||||
if tunnel_type == IPINIP_TUNNEL:
|
||||
return self_loopback_ip, peer_loopback_ip
|
||||
|
||||
return None, None
|
||||
|
||||
def get_inner_pkt_type(self, packet):
|
||||
"""
|
||||
Get the type of an inner encapsulated packet
|
||||
|
||||
Returns:
|
||||
(str) 'v4' if the inner packet is IPv4
|
||||
(str) 'v6' if the inner packet is IPv6
|
||||
(bool) False if `packet` is not an IPinIP packet
|
||||
"""
|
||||
if packet.haslayer(IP):
|
||||
# Determine inner packet type based on IP protocol number
|
||||
# The outer packet type should always be IPv4
|
||||
if packet[IP].proto == 4:
|
||||
return IP
|
||||
elif packet[IP].proto == 41:
|
||||
return IPv6
|
||||
return False
|
||||
|
||||
def wait_for_netlink_msgs(self):
|
||||
"""
|
||||
Gathers any RTM_NEWLINK messages
|
||||
|
||||
Returns:
|
||||
(list) containing any received messages
|
||||
"""
|
||||
msgs = []
|
||||
with IPRoute() as ipr:
|
||||
ipr.bind()
|
||||
for msg in ipr.get():
|
||||
if msg['event'] == RTM_NEWLINK:
|
||||
msgs.append(msg)
|
||||
|
||||
return msgs
|
||||
|
||||
def sniffer_restart_required(self, messages):
|
||||
"""
|
||||
Determines if the packet sniffer needs to be restarted
|
||||
|
||||
A restart is required if all of the following conditions are met:
|
||||
1. A netlink message of type RTM_NEWLINK is received
|
||||
(this is checked by `wait_for_netlink_msgs`)
|
||||
2. The interface index of the message corresponds to a portchannel
|
||||
interface
|
||||
3. The state of the interface in the message is 'up'
|
||||
Here, we do not care about an interface going down since
|
||||
the sniffer is able to continue sniffing on the other
|
||||
interfaces. However, if an interface has gone down and
|
||||
come back up, we need to restart the sniffer to be able
|
||||
to sniff traffic on the interface that has come back up.
|
||||
"""
|
||||
pc_index_map = self.get_portchannel_index_mapping()
|
||||
for msg in messages:
|
||||
if msg['index'] in pc_index_map:
|
||||
if msg['state'] == 'up':
|
||||
logger.log_info('{} came back up, sniffer restart required'
|
||||
.format(pc_index_map[msg['index']]))
|
||||
return True
|
||||
return False
|
||||
|
||||
def listen_for_tunnel_pkts(self):
|
||||
"""
|
||||
Listens for tunnel packets that are trapped to CPU
|
||||
|
||||
These packets may be trapped if there is no neighbor info for the
|
||||
inner packet destination IP in the hardware.
|
||||
"""
|
||||
|
||||
def _ping_inner_dst(packet):
|
||||
"""
|
||||
Pings the inner destination IP for an encapsulated packet
|
||||
|
||||
Args:
|
||||
packet: The encapsulated packet received
|
||||
"""
|
||||
inner_packet_type = self.get_inner_pkt_type(packet)
|
||||
if inner_packet_type and packet[IP].dst == self_ip:
|
||||
cmds = ['timeout', '0.2', 'ping', '-c1',
|
||||
'-W1', '-i0', '-n', '-q']
|
||||
if inner_packet_type == IPv6:
|
||||
cmds.append('-6')
|
||||
dst_ip = packet[IP].payload[inner_packet_type].dst
|
||||
cmds.append(dst_ip)
|
||||
logger.log_info("Running command '{}'".format(' '.join(cmds)))
|
||||
subprocess.run(cmds, stdout=subprocess.DEVNULL)
|
||||
|
||||
self_ip, peer_ip = self.get_ipinip_tunnel_addrs()
|
||||
if self_ip is None or peer_ip is None:
|
||||
logger.log_notice('Could not get tunnel addresses from '
|
||||
'config DB, exiting...')
|
||||
return None
|
||||
|
||||
packet_filter = 'host {} and host {}'.format(self_ip, peer_ip)
|
||||
logger.log_notice('Starting tunnel packet handler for {}'
|
||||
.format(packet_filter))
|
||||
|
||||
sniff_intfs = self.get_up_portchannels()
|
||||
logger.log_info("Listening on interfaces {}".format(sniff_intfs))
|
||||
|
||||
sniffer = AsyncSniffer(
|
||||
iface=sniff_intfs,
|
||||
filter=packet_filter,
|
||||
prn=_ping_inner_dst
|
||||
|
||||
)
|
||||
sniffer.start()
|
||||
while True:
|
||||
msgs = self.wait_for_netlink_msgs()
|
||||
if self.sniffer_restart_required(msgs):
|
||||
sniffer.stop()
|
||||
sniff_intfs = self.get_up_portchannels()
|
||||
logger.log_notice('Restarting tunnel packet handler on '
|
||||
'interfaces {}'.format(sniff_intfs))
|
||||
sniffer = AsyncSniffer(
|
||||
iface=sniff_intfs,
|
||||
filter=packet_filter,
|
||||
prn=_ping_inner_dst
|
||||
)
|
||||
sniffer.start()
|
||||
|
||||
def run(self):
|
||||
self.wait_for_portchannels()
|
||||
self.listen_for_tunnel_pkts()
|
||||
|
||||
|
||||
def main():
|
||||
logger.set_min_log_priority_info()
|
||||
handler = TunnelPacketHandler()
|
||||
handler.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user