Feature (contd.): Device Emulation - added test case for IPv6 - currently failing pending IPv6 implementation

This commit is contained in:
Srivats P 2015-12-30 18:25:13 +05:30
parent 12d17ef6de
commit ea68b42059

View File

@ -1,6 +1,7 @@
#! /usr/bin/env python #! /usr/bin/env python
# standard modules # standard modules
import ipaddress
import logging import logging
import os import os
import re import re
@ -18,12 +19,13 @@ from core import ost_pb, emul, DroneProxy
from rpc import RpcError from rpc import RpcError
from protocols.mac_pb2 import mac, Mac from protocols.mac_pb2 import mac, Mac
from protocols.ip4_pb2 import ip4, Ip4 from protocols.ip4_pb2 import ip4, Ip4
from protocols.ip6_pb2 import ip6, Ip6
from protocols.vlan_pb2 import vlan from protocols.vlan_pb2 import vlan
use_defaults = True use_defaults = True
if sys.platform == 'win32': if sys.platform == 'win32':
tshark = r'C:\Program Files\Wireshark\tshark.exe' tshark = r'C:\PortableTools\WiresharkPortable\App\Wireshark\tshark.exe'
else: else:
tshark = 'tshark' tshark = 'tshark'
@ -71,6 +73,24 @@ if not use_defaults:
env.host_string = s or env.host_string env.host_string = s or env.host_string
# FIXME: get inputs for dut rx/tx ports # FIXME: get inputs for dut rx/tx ports
# Convenience class to interwork with OstEmul::Ip6Address() and
# the python ipaddress module
class ip6_address(ipaddress.IPv6Interface):
def __init__(self, addr):
super(ip6_address, self).__init__(unicode(addr))
self.ip6 = emul.Ip6Address()
self.ip6.hi = int(self) >> 64
self.ip6.lo = int(self) & 0xffffffffffffffff
self.prefixlen = self.network.prefixlen
# we assume gateway is the lowest IP host address in the network
gateway = self.network.network_address + 1
self.gateway = emul.Ip6Address()
self.ip6.hi = int(gateway) >> 64
self.ip6.lo = int(gateway) & 0xffffffffffffffff
# ================================================================= # # ================================================================= #
# ----------------------------------------------------------------- # # ----------------------------------------------------------------- #
# FIXTURES # FIXTURES
@ -196,37 +216,25 @@ def dgid_list(request, drone, ports):
return dgid_list return dgid_list
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def stream_id(request, drone, ports): def stream_clear(request, drone, ports):
# ----------------------------------------------------------------- #
# create stream on tx port - each test case will modify and reuse
# this stream as per its needs
# ----------------------------------------------------------------- #
# delete existing streams, if any, on tx port # delete existing streams, if any, on tx port
sid_list = drone.getStreamIdList(ports.tx.port_id[0]) sid_list = drone.getStreamIdList(ports.tx.port_id[0])
drone.deleteStream(sid_list) drone.deleteStream(sid_list)
# add a stream
stream_id = ost_pb.StreamIdList()
stream_id.port_id.CopyFrom(ports.tx.port_id[0])
stream_id.stream_id.add().id = 1
log.info('adding tx_stream %d' % stream_id.stream_id[0].id)
drone.addStream(stream_id)
def fin():
drone.deleteStream(stream_id)
request.addfinalizer(fin)
return stream_id
@pytest.fixture @pytest.fixture
def dut_ip(request, dut_ports): def dut_ip(request, dut_ports):
sudo('ip address add 10.10.1.1/24 dev ' + dut_ports.rx) sudo('ip address add 10.10.1.1/24 dev ' + dut_ports.rx)
sudo('ip address add 10.10.2.1/24 dev ' + dut_ports.tx) sudo('ip address add 10.10.2.1/24 dev ' + dut_ports.tx)
sudo('ip -6 address add 1234:1::1/96 dev ' + dut_ports.rx)
sudo('ip -6 address add 1234:2::1/96 dev ' + dut_ports.tx)
def fin(): def fin():
sudo('ip address delete 10.10.1.1/24 dev ' + dut_ports.rx) sudo('ip address delete 10.10.1.1/24 dev ' + dut_ports.rx)
sudo('ip address delete 10.10.2.1/24 dev ' + dut_ports.tx) sudo('ip address delete 10.10.2.1/24 dev ' + dut_ports.tx)
sudo('ip -6 address delete 1234:1::1/96 dev ' + dut_ports.rx)
sudo('ip -6 address delete 1234:2::1/96 dev ' + dut_ports.tx)
request.addfinalizer(fin) request.addfinalizer(fin)
@pytest.fixture @pytest.fixture
@ -333,23 +341,26 @@ def dut_vlans(request, dut_ports):
# ================================================================= # # ================================================================= #
@pytest.mark.parametrize('dev_cfg', [ @pytest.mark.parametrize('dev_cfg', [
{'mac_step': 1, 'ip_step': 1}, {'ip_ver': [6], 'mac_step': 1, 'ip_step': 1},
{'mac_step': 2, 'ip_step': 5}, {'ip_ver': [4, 6], 'mac_step': 2, 'ip_step': 5},
]) ])
def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip, def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
stream_id, emul_ports, dgid_list, dev_cfg): stream_clear, emul_ports, dgid_list, dev_cfg):
# ----------------------------------------------------------------- # # ----------------------------------------------------------------- #
# TESTCASE: Emulate multiple IPv4 devices (no vlans) # TESTCASE: Emulate multiple IPv4 devices (no vlans)
# DUT # DUT
# /.1 \.1 # /.1 \.1
# / \ # / \
# 10.10.1/24 10.10.2/24 # 10.10.1/24 10.10.2/24
# 1234::1/96 1234::2/96
# / \ # / \
# /.101-105 \.101-105 # /.101-105 \.101-105
# Host1(s) Host2(s) # Host1(s) Host2(s)
# ----------------------------------------------------------------- # # ----------------------------------------------------------------- #
num_devs = 5 num_devs = 5
has_ip4 = True if 4 in dev_cfg['ip_ver'] else False
has_ip6 = True if 6 in dev_cfg['ip_ver'] else False
mac_step = dev_cfg['mac_step'] mac_step = dev_cfg['mac_step']
ip_step = dev_cfg['ip_step'] ip_step = dev_cfg['ip_step']
@ -363,12 +374,21 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
dg.Extensions[emul.mac].address = 0x000102030a01 dg.Extensions[emul.mac].address = 0x000102030a01
if (mac_step != 1): if (mac_step != 1):
dg.Extensions[emul.mac].step = mac_step dg.Extensions[emul.mac].step = mac_step
if has_ip4:
ip = dg.Extensions[emul.ip4] ip = dg.Extensions[emul.ip4]
ip.address = 0x0a0a0165 ip.address = 0x0a0a0165
ip.prefix_length = 24 ip.prefix_length = 24
if (ip_step != 1): if (ip_step != 1):
ip.step = ip_step ip.step = ip_step
ip.default_gateway = 0x0a0a0101 ip.default_gateway = 0x0a0a0101
if has_ip6:
ip = dg.Extensions[emul.ip6]
ip6addr = ip6_address('1234:1::65/96')
ip.address.CopyFrom(ip6addr.ip6)
ip.prefix_length = ip6addr.prefixlen
if (ip_step != 1):
ip.step.CopyFrom(ip6_address(ip_step).ip6)
ip.default_gateway.CopyFrom(ip6addr.gateway)
drone.modifyDeviceGroup(devgrp_cfg) drone.modifyDeviceGroup(devgrp_cfg)
@ -382,25 +402,51 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
dg.Extensions[emul.mac].address = 0x000102030b01 dg.Extensions[emul.mac].address = 0x000102030b01
if (mac_step != 1): if (mac_step != 1):
dg.Extensions[emul.mac].step = mac_step dg.Extensions[emul.mac].step = mac_step
if has_ip4:
ip = dg.Extensions[emul.ip4] ip = dg.Extensions[emul.ip4]
ip.address = 0x0a0a0265 ip.address = 0x0a0a0265
ip.prefix_length = 24 ip.prefix_length = 24
if (ip_step != 1): if (ip_step != 1):
ip.step = ip_step ip.step = ip_step
ip.default_gateway = 0x0a0a0201 ip.default_gateway = 0x0a0a0201
if has_ip6:
ip = dg.Extensions[emul.ip6]
ip6addr = ip6_address('1234:2::65/96')
ip.address.CopyFrom(ip6addr.ip6)
ip.prefix_length = ip6addr.prefixlen
if (ip_step != 1):
ip.step.CopyFrom(ip6_address(ip_step).ip6)
ip.default_gateway.CopyFrom(ip6addr.gateway)
drone.modifyDeviceGroup(devgrp_cfg) drone.modifyDeviceGroup(devgrp_cfg)
# configure the tx stream # TODO: reuse dev_cfg['ip_ver']
ip_versions = []
if has_ip4:
ip_versions.append('ip4')
if has_ip6:
ip_versions.append('ip6')
# add the tx stream(s) - we may need more than one
stream_id = ost_pb.StreamIdList()
stream_id.port_id.CopyFrom(ports.tx.port_id[0])
for i in range(len(ip_versions)):
stream_id.stream_id.add().id = i
log.info('adding tx_stream %d' % stream_id.stream_id[i].id)
drone.addStream(stream_id)
# configure the tx stream(s)
stream_cfg = ost_pb.StreamConfigList() stream_cfg = ost_pb.StreamConfigList()
stream_cfg.port_id.CopyFrom(ports.tx.port_id[0]) stream_cfg.port_id.CopyFrom(ports.tx.port_id[0])
for i in range(len(ip_versions)):
s = stream_cfg.stream.add() s = stream_cfg.stream.add()
s.stream_id.id = stream_id.stream_id[0].id s.stream_id.id = stream_id.stream_id[i].id
s.core.is_enabled = True s.core.is_enabled = True
s.control.packets_per_sec = 100 s.control.packets_per_sec = 100
s.control.num_packets = 10 s.control.num_packets = 10
# setup stream protocols as mac:eth2:ip4:udp:payload # setup stream protocols as mac:eth2:ip:udp:payload
p = s.protocol.add() p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber
p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve
@ -409,8 +455,10 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
p = s.protocol.add() p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber
if ip_versions[i] == 'ip4':
p = s.protocol.add() p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber
ip = None
ip = p.Extensions[ip4] ip = p.Extensions[ip4]
ip.src_ip = 0x0a0a0165 ip.src_ip = 0x0a0a0165
if ip_step == 1: if ip_step == 1:
@ -438,11 +486,50 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
vf.step = ip_step vf.step = ip_step
vf.mode = ost_pb.VariableField.kIncrement vf.mode = ost_pb.VariableField.kIncrement
vf.count = num_devs vf.count = num_devs
elif ip_versions[i] == 'ip6':
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kIp6FieldNumber
ip = p.Extensions[ip6]
ip6addr = ip6_address('1234:1::65/96')
ip.src_addr_hi = ip6addr.ip6.hi
ip.src_addr_lo = ip6addr.ip6.lo
if ip_step == 1:
ip.src_addr_mode = Ip6.kIncHost
ip.src_addr_count = num_devs
ip.src_addr_prefix = ip6addr.prefixlen
else:
vf = p.variable_field.add()
vf.type = ost_pb.VariableField.kCounter32
vf.offset = 20
vf.mask = 0xFFFFFFFF
vf.value = 0x00000065
vf.step = ip_step
vf.mode = ost_pb.VariableField.kIncrement
vf.count = num_devs
ip6addr = ip6_address('1234:2::65/96')
ip.dst_addr_hi = ip6addr.ip6.hi
ip.dst_addr_lo = ip6addr.ip6.lo
if ip_step == 1:
ip.dst_addr_mode = Ip6.kIncHost
ip.dst_addr_count = num_devs
ip.dst_addr_prefix = ip6addr.prefixlen
else:
vf = p.variable_field.add()
vf.type = ost_pb.VariableField.kCounter32
vf.offset = 20
vf.mask = 0xFFFFFFFF
vf.value = 0x00000065
vf.step = ip_step
vf.mode = ost_pb.VariableField.kIncrement
vf.count = num_devs
else:
assert False # unreachable
s.protocol.add().protocol_id.id = ost_pb.Protocol.kUdpFieldNumber s.protocol.add().protocol_id.id = ost_pb.Protocol.kUdpFieldNumber
s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber
log.info('configuring tx_stream %d' % stream_id.stream_id[0].id) log.info('configuring tx_stream %d' % stream_id.stream_id[i].id)
drone.modifyStream(stream_cfg) drone.modifyStream(stream_cfg)
# FIXME(needed?): clear tx/rx stats # FIXME(needed?): clear tx/rx stats
@ -450,10 +537,11 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
drone.clearStats(ports.tx) drone.clearStats(ports.tx)
drone.clearStats(ports.rx) drone.clearStats(ports.rx)
# clear arp on DUT # clear arp/ndp on DUT
sudo('ip neigh flush all') sudo('ip neigh flush all')
arp_cache = run('ip neigh show') arp_cache = run('ip neigh show')
assert re.search('10.10.[1-2].1\d\d.*lladdr', arp_cache) == None assert re.search('10.10.[1-2].1\d\d.*lladdr', arp_cache) == None
assert re.search('1234:[1-2]::\[\da-f]+.*lladdr', arp_cache) == None
# resolve ARP on tx/rx ports # resolve ARP on tx/rx ports
log.info('resolving Neighbors on tx/rx ports ...') log.info('resolving Neighbors on tx/rx ports ...')
@ -470,12 +558,23 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap']) cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap'])
print(cap_pkts) print(cap_pkts)
log.info('dumping Tx capture buffer (filtered)') log.info('dumping Tx capture buffer (filtered)')
for i in range(num_devs): for i in range(len(ip_versions)):
for j in range(num_devs):
if ip_versions[i] == 'ip4':
filter = '(arp.opcode == 1)' \
' && (arp.src.proto_ipv4 == 10.10.1.' \
+ str(101+j*ip_step) + ')' \
' && (arp.dst.proto_ipv4 == 10.10.1.1)'
elif ip_versions[i] == 'ip6':
filter = '(icmpv6.type == 135)' \
' && (icmpv6.nd.ns.target_address == 1234:1::1)' \
' && (icmpv6.nd.ns.target_address == 1234:1::' \
+ format(0x65+i*ip_step, 'x')+')'
print filter
else:
assert False # unreachable
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-R', '(arp.opcode == 1)' '-Y', filter])
' && (arp.src.proto_ipv4 == 10.10.1.'
+str(101+i*ip_step)+')'
' && (arp.dst.proto_ipv4 == 10.10.1.1)'])
print(cap_pkts) print(cap_pkts)
assert cap_pkts.count('\n') == 1 assert cap_pkts.count('\n') == 1
@ -486,12 +585,23 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap']) cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap'])
print(cap_pkts) print(cap_pkts)
log.info('dumping Rx capture buffer (filtered)') log.info('dumping Rx capture buffer (filtered)')
for i in range(num_devs): for i in range(len(ip_versions)):
for j in range(num_devs):
if ip_versions[i] == 'ip4':
filter = '(arp.opcode == 1)' \
' && (arp.src.proto_ipv4 == 10.10.2.' \
+ str(101+j*ip_step) + ')' \
' && (arp.dst.proto_ipv4 == 10.10.2.1)'
elif ip_versions[i] == 'ip6':
filter = '(icmpv6.type == 135)' \
' && (icmpv6.nd.ns.target_address == 1234:2::1)' \
' && (icmpv6.nd.ns.target_address == 1234:2::' \
+ format(0x65+i*ip_step, 'x')+')'
else:
assert False # unreachable
print filter
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-R', '(arp.opcode == 1)' '-Y', filter])
' && (arp.src.proto_ipv4 == 10.10.2.'
+str(101+i*ip_step)+')'
' && (arp.dst.proto_ipv4 == 10.10.2.1)'])
print(cap_pkts) print(cap_pkts)
assert cap_pkts.count('\n') == 0 assert cap_pkts.count('\n') == 0
@ -505,11 +615,13 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
for dev_cfg, device in zip(device_config, devices): for dev_cfg, device in zip(device_config, devices):
resolved = False resolved = False
for arp in device.arp: for arp in device.arp:
# TODO: pretty print ip and mac print('%s: %s %012x' %
print('%08x: %08x %012x' % (str(ipaddress.ip_address(dev_cfg.ip4)),
(dev_cfg.ip4, arp.ip4, arp.mac)) str(ipaddress.ip_address(arp.ip4)),
arp.mac))
if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac): if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac):
resolved = True resolved = True
# TODO: ip6/ndp
assert resolved assert resolved
log.info('retrieving ARP entries on rx port') log.info('retrieving ARP entries on rx port')
@ -525,16 +637,19 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
# TODO: pretty print ip and mac # TODO: pretty print ip and mac
print('%08x: %08x %012x' % print('%08x: %08x %012x' %
(dev_cfg.ip4, arp.ip4, arp.mac)) (dev_cfg.ip4, arp.ip4, arp.mac))
# TODO: ip6/ndp
# ping the tx devices from the DUT # ping the tx devices from the DUT
for i in range(num_devs): for i in range(num_devs):
out = run('ping -c3 10.10.1.'+str(101+i*ip_step), warn_only=True) out = run('ping -c3 10.10.1.'+str(101+i*ip_step), warn_only=True)
assert '100% packet loss' not in out assert '100% packet loss' not in out
# TODO: ip6/ndp
# ping the tx devices from the DUT # ping the tx devices from the DUT
for i in range(num_devs): for i in range(num_devs):
out = run('ping -c3 10.10.2.'+str(101+i*ip_step), warn_only=True) out = run('ping -c3 10.10.2.'+str(101+i*ip_step), warn_only=True)
assert '100% packet loss' not in out assert '100% packet loss' not in out
# TODO: ip6/ndp
# We are all set now - so transmit the stream now # We are all set now - so transmit the stream now
drone.startCapture(ports.rx) drone.startCapture(ports.rx)
@ -552,7 +667,7 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
log.info('dumping Rx capture buffer (filtered)') log.info('dumping Rx capture buffer (filtered)')
for i in range(num_devs): for i in range(num_devs):
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-R', '(ip.src == 10.10.1.' + str(101+i*ip_step) + ') ' '-Y', '(ip.src == 10.10.1.' + str(101+i*ip_step) + ') '
' && (ip.dst == 10.10.2.' + str(101+i*ip_step) + ')' ' && (ip.dst == 10.10.2.' + str(101+i*ip_step) + ')'
' && (eth.dst == 00:01:02:03:0b:' ' && (eth.dst == 00:01:02:03:0b:'
+ format(1+i*mac_step, '02x')+')']) + format(1+i*mac_step, '02x')+')'])
@ -581,8 +696,8 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
{'base': 31, 'count': 2}, {'base': 31, 'count': 2},
{'base': 1, 'count': 3}], {'base': 1, 'count': 3}],
]) ])
def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, stream_id, def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
emul_ports, dgid_list, vlan_cfg): stream_clear, emul_ports, dgid_list, vlan_cfg):
# ----------------------------------------------------------------- # # ----------------------------------------------------------------- #
# TESTCASE: Emulate multiple IPv4 device per multiple single-tag VLANs # TESTCASE: Emulate multiple IPv4 device per multiple single-tag VLANs
# #
@ -669,13 +784,7 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, stream_id,
drone.modifyDeviceGroup(devgrp_cfg) drone.modifyDeviceGroup(devgrp_cfg)
# configure the tx stream(s) # add the tx stream(s) - we need more than one stream
# we need more than one stream, so delete old one
# and create as many as we need
# FIXME: restore the single stream at end?
log.info('deleting tx_stream %d' % stream_id.stream_id[0].id)
drone.deleteStream(stream_id)
stream_id = ost_pb.StreamIdList() stream_id = ost_pb.StreamIdList()
stream_id.port_id.CopyFrom(ports.tx.port_id[0]) stream_id.port_id.CopyFrom(ports.tx.port_id[0])
for i in range(num_vlans): for i in range(num_vlans):