From 8efdb44e6aca359431a7e9122e03ce793a65019c Mon Sep 17 00:00:00 2001 From: Srivats P Date: Sat, 23 Jan 2016 21:01:47 +0530 Subject: [PATCH] Device Emulation (contd.): Enhanced the VLAN Devices test case to include IPv6 (and dual stack) scenario(s) --- test/emultest.py | 371 +++++++++++++++++++++++++++++++---------------- 1 file changed, 248 insertions(+), 123 deletions(-) diff --git a/test/emultest.py b/test/emultest.py index a0fb88e..7052760 100644 --- a/test/emultest.py +++ b/test/emultest.py @@ -260,6 +260,8 @@ def dut_vlans(request, dut_ports): for i in range(len(devices.rx)): vrf = 'vrf' + str(i+1) sudo('ip netns add ' + vrf) + sudo('ip netns exec ' + vrf + + ' sysctl -w net.ipv6.conf.all.forwarding=1') dev = devices.rx[i] sudo('ip link set ' + dev @@ -267,6 +269,9 @@ def dut_vlans(request, dut_ports): sudo('ip netns exec ' + vrf + ' ip addr add 10.1.1.1/24' + ' dev ' + dev) + sudo('ip netns exec ' + vrf + + ' ip -6 addr add 1234:1::1/96' + + ' dev ' + dev) sudo('ip netns exec ' + vrf + ' ip link set ' + dev + ' up') @@ -276,6 +281,9 @@ def dut_vlans(request, dut_ports): sudo('ip netns exec ' + vrf + ' ip addr add 10.1.2.1/24' + ' dev ' + dev) + sudo('ip netns exec ' + vrf + + ' ip -6 addr add 1234:2::1/96' + + ' dev ' + dev) sudo('ip netns exec ' + vrf + ' ip link set ' + dev + ' up') @@ -625,8 +633,8 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip, devices = neigh_list.Extensions[emul.devices] log.info('ARP/NDP Table on tx port') for dev_cfg, device in zip(device_config, devices): - resolved = False if has_ip4: + resolved = False for arp in device.arp: print('%s: %s %012x' % (str(ipaddress.ip_address(dev_cfg.ip4)), @@ -636,6 +644,7 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip, resolved = True assert resolved if has_ip6: + resolved = False for ndp in device.ndp: print('%s: %s %012x' % (str(ip6_address(dev_cfg.ip6)), @@ -654,19 +663,17 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip, for dev_cfg, device in zip(device_config, devices): # verify *no* ARPs/NDPs learnt on rx port if has_ip4: - assert len(device.arp) == 0 for arp in device.arp: print('%s: %s %012x' % (str(ipaddress.ip_address(dev_cfg.ip4)), - str(ipaddress.ip_address(arp.ip4)), - arp.mac)) + str(ipaddress.ip_address(arp.ip4)), arp.mac)) + assert len(device.arp) == 0 if has_ip6: - assert len(device.ndp) == 0 for ndp in device.ndp: print('%s: %s %012x' % (str(ip6_address(dev_cfg.ip6)), - str(ip6_address(ndp.ip6)), - ndp.mac)) + str(ip6_address(ndp.ip6)), ndp.mac)) + assert len(device.ndp) == 0 # ping the tx devices from the DUT for i in range(num_devs): @@ -730,26 +737,26 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip, drone.stopTransmit(ports.tx) run('ip neigh show') -@pytest.mark.parametrize('vlan_cfg', [ - [{'base': 11, 'count': 5}], +@pytest.mark.parametrize('vlan_cfg,ip_ver', [ + ([{'base': 11, 'count': 5}], [6]), - [{'base': 11, 'count': 2}, - {'base': 21, 'count': 3}], + ([{'base': 11, 'count': 2}, + {'base': 21, 'count': 3}], [4]), - [{'base': 11, 'count': 2, 'tpid': 0x88a8}, - {'base': 21, 'count': 3}], + ([{'base': 11, 'count': 2, 'tpid': 0x88a8}, + {'base': 21, 'count': 3}], [4, 6]), - [{'base': 11, 'count': 2}, + ([{'base': 11, 'count': 2}, {'base': 21, 'count': 3, 'step': 2}, - {'base': 31, 'count': 2, 'step': 3}], + {'base': 31, 'count': 2, 'step': 3}], [6]), - [{'base': 11, 'count': 2}, + ([{'base': 11, 'count': 2}, {'base': 21, 'count': 3}, {'base': 31, 'count': 2}, - {'base': 1, 'count': 3}], + {'base': 1, 'count': 3}], [4]) ]) def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, - stream_clear, emul_ports, dgid_list, vlan_cfg): + stream_clear, emul_ports, dgid_list, vlan_cfg, ip_ver): # ----------------------------------------------------------------- # # TESTCASE: Emulate multiple IPv4 device per multiple single-tag VLANs # @@ -757,6 +764,7 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, # .1/ \.1 # / \ # 10.1.1/24 10.1.2/24 + # 1234:1/96 1234:2/96 # [vlans] [vlans] # / \ # /.101-103 \.101-103 @@ -768,8 +776,14 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, num_vlans = num_vlans * vcfg['count'] test_multiEmulDevPerVlan.vlan_cfg = vlan_cfg num_devs_per_vlan = 3 - tx_ip_base = 0x0a010165 - rx_ip_base = 0x0a010265 + has_ip4 = True if 4 in ip_ver else False + has_ip6 = True if 6 in ip_ver else False + if has_ip4: + tx_ip4_base = 0x0a010165 + rx_ip4_base = 0x0a010265 + if has_ip6: + tx_ip6_base = ip6_address('1234:1::65/96') + rx_ip6_base = ip6_address('1234:2::65/96') # configure vlans on the DUT dut_vlans(request, dut_ports) @@ -806,10 +820,16 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, v.step = vcfg['step'] dg.device_count = num_devs_per_vlan dg.Extensions[emul.mac].address = 0x000102030a01 - ip = dg.Extensions[emul.ip4] - ip.address = tx_ip_base - ip.prefix_length = 24 - ip.default_gateway = (tx_ip_base & 0xffffff00) | 0x01 + if has_ip4: + ip = dg.Extensions[emul.ip4] + ip.address = tx_ip4_base + ip.prefix_length = 24 + ip.default_gateway = (tx_ip4_base & 0xffffff00) | 0x01 + if has_ip6: + ip = dg.Extensions[emul.ip6] + ip.address.CopyFrom(tx_ip6_base.ip6) + ip.prefix_length = tx_ip6_base.prefixlen + ip.default_gateway.CopyFrom(tx_ip6_base.gateway) drone.modifyDeviceGroup(devgrp_cfg) @@ -829,71 +849,100 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, v.step = vcfg['step'] dg.device_count = num_devs_per_vlan dg.Extensions[emul.mac].address = 0x000102030b01 - ip = dg.Extensions[emul.ip4] - ip.address = rx_ip_base - ip.prefix_length = 24 - ip.default_gateway = (rx_ip_base & 0xffffff00) | 0x01 + if has_ip4: + ip = dg.Extensions[emul.ip4] + ip.address = rx_ip4_base + ip.prefix_length = 24 + ip.default_gateway = (rx_ip4_base & 0xffffff00) | 0x01 + if has_ip6: + ip = dg.Extensions[emul.ip6] + ip.address.CopyFrom(rx_ip6_base.ip6) + ip.prefix_length = rx_ip6_base.prefixlen + ip.default_gateway.CopyFrom(rx_ip6_base.gateway) drone.modifyDeviceGroup(devgrp_cfg) # add the tx stream(s) - we need more than one stream stream_id = ost_pb.StreamIdList() stream_id.port_id.CopyFrom(ports.tx.port_id[0]) - for i in range(num_vlans): + for i in range(num_vlans * len(ip_ver)): stream_id.stream_id.add().id = i log.info('adding tx_stream %d' % stream_id.stream_id[i].id) drone.addStream(stream_id) + # configure the tx stream(s) stream_cfg = ost_pb.StreamConfigList() stream_cfg.port_id.CopyFrom(ports.tx.port_id[0]) for i in range(num_vlans): - s = stream_cfg.stream.add() - s.stream_id.id = stream_id.stream_id[i].id - s.core.name = 'stream ' + str(s.stream_id.id) - s.core.is_enabled = True - s.core.ordinal = i - s.control.packets_per_sec = 100 - s.control.num_packets = num_devs_per_vlan + for j in range(len(ip_ver)): + idx = i*len(ip_ver) + j + s = stream_cfg.stream.add() + s.stream_id.id = stream_id.stream_id[idx].id + s.core.name = 'stream ' + str(s.stream_id.id) + s.core.is_enabled = True + s.core.ordinal = idx + s.core.frame_len = 1024 + s.control.packets_per_sec = 100 + s.control.num_packets = num_devs_per_vlan - # setup stream protocols as mac:vlan:eth2:ip4:udp:payload - p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber - p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve - p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve - - ids = dut_vlans.vlans[i].split('.') - for id, j in zip(ids, range(len(ids))): + # setup stream protocols as mac:vlan:eth2:ip4:udp:payload p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kVlanFieldNumber - p.Extensions[vlan].vlan_tag = int(id) - if 'tpid' in vlan_cfg[j]: - p.Extensions[vlan].tpid = vlan_cfg[j]['tpid'] - p.Extensions[vlan].is_override_tpid = True + p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber + p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve + p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve - p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber + ids = dut_vlans.vlans[i].split('.') + for id, idx in zip(ids, range(len(ids))): + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kVlanFieldNumber + p.Extensions[vlan].vlan_tag = int(id) + if 'tpid' in vlan_cfg[idx]: + p.Extensions[vlan].tpid = vlan_cfg[idx]['tpid'] + p.Extensions[vlan].is_override_tpid = True - p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber - ip = p.Extensions[ip4] - ip.src_ip = tx_ip_base - ip.src_ip_mode = Ip4.e_im_inc_host - ip.src_ip_count = num_devs_per_vlan - ip.dst_ip = rx_ip_base - ip.dst_ip_mode = Ip4.e_im_inc_host - ip.dst_ip_count = num_devs_per_vlan + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber - p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kUdpFieldNumber - p = s.protocol.add() - p.protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber + if ip_ver[j] == 4: + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber + ip = p.Extensions[ip4] + ip.src_ip = tx_ip4_base + ip.src_ip_mode = Ip4.e_im_inc_host + ip.src_ip_count = num_devs_per_vlan + ip.dst_ip = rx_ip4_base + ip.dst_ip_mode = Ip4.e_im_inc_host + ip.dst_ip_count = num_devs_per_vlan + elif ip_ver[j] == 6: + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kIp6FieldNumber + ip = p.Extensions[ip6] - log.info('configuring tx_stream %d' % stream_id.stream_id[i].id) + ip.src_addr_hi = tx_ip6_base.ip6.hi + ip.src_addr_lo = tx_ip6_base.ip6.lo + ip.src_addr_mode = Ip6.kIncHost + ip.src_addr_count = num_devs_per_vlan + ip.src_addr_prefix = tx_ip6_base.prefixlen + + ip.dst_addr_hi = rx_ip6_base.ip6.hi + ip.dst_addr_lo = rx_ip6_base.ip6.lo + ip.dst_addr_mode = Ip6.kIncHost + ip.dst_addr_count = num_devs_per_vlan + ip.dst_addr_prefix = rx_ip6_base.prefixlen + else: + assert false # unreachable + + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kUdpFieldNumber + p = s.protocol.add() + p.protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber + + log.info('configuring tx_stream %d' % stream_id.stream_id[idx].id) drone.modifyStream(stream_cfg) - # clear arp on DUT + # clear arp/ndp on DUT for i in range(num_vlans): vrf = 'vrf' + str(i+1) @@ -902,8 +951,14 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, arp_cache = sudo('ip netns exec ' + vrf + ' ip neigh show') assert re.search('10.1.[1-2].20[1-5].*lladdr', arp_cache) == None + assert re.search('1234:[1-2]::\[\da-f]+.*lladdr', arp_cache) == None - # resolve ARP on tx/rx ports + # wait for interface to do DAD? Otherwise we don't get replies for NS + # FIXME: find alternative to sleep + if has_ip6: + time.sleep(5) + + # resolve ARP/NDP on tx/rx ports log.info('resolving Neighbors on tx/rx ports ...') drone.startCapture(emul_ports) drone.clearDeviceNeighbors(emul_ports) @@ -911,7 +966,7 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, time.sleep(3) drone.stopCapture(emul_ports) - # verify ARP Requests sent out from tx port + # verify ARP/NDP Requests sent out from tx port buff = drone.getCaptureBuffer(emul_ports.port_id[0]) drone.saveCaptureBuffer(buff, 'capture.pcap') log.info('dumping Tx capture buffer (all)') @@ -923,18 +978,30 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, print(cap_pkts) log.info('dumping Tx capture buffer (filtered)') for i in range(num_vlans): - for j in range(num_devs_per_vlan): - cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', - '-R', vlan_filter[i] + - ' && (arp.opcode == 1)' - ' && (arp.src.proto_ipv4 == 10.1.1.' - +str(101+j)+')' - ' && (arp.dst.proto_ipv4 == 10.1.1.1)']) - print(cap_pkts) - assert cap_pkts.count('\n') == 1 + for j in range(len(ip_ver)): + if ip_ver[j] == 4: + filter = vlan_filter[i] + ' && (arp.opcode == 1)' \ + ' && (arp.src.proto_ipv4 == 10.1.1.)' \ + ' && (arp.dst.proto_ipv4 == 10.1.1.1)' \ + ' && !expert.severity' + elif ip_ver[j] == 6: + filter = vlan_filter[i] + ' && (icmpv6.type == 135)' \ + ' && (ipv6.src == 1234:1::)' \ + ' && (icmpv6.nd.ns.target_address == 1234:1::1)' \ + ' && !expert.severity' + for k in range(num_devs_per_vlan): + if ip_ver[j] == 4: + filter = filter.replace('', str(101+k)) + elif ip_ver[j] == 6: + filter = filter.replace('', format(0x65+k, 'x')) + print filter + cap_pkts = subprocess.check_output([tshark, '-nr', + 'capture.pcap', '-Y', filter]) + print(cap_pkts) + assert cap_pkts.count('\n') == 1 - # verify *no* ARP Requests sent out from rx port - buff = drone.getCaptureBuffer(emul_ports.port_id[1]) + # verify *no* ARP/NDP Requests sent out from rx port + buff = drone.getCaptureBuffer(emul_ports.port_id[0]) drone.saveCaptureBuffer(buff, 'capture.pcap') log.info('dumping Rx capture buffer (all)') cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap']) @@ -945,63 +1012,109 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, print(cap_pkts) log.info('dumping Rx capture buffer (filtered)') for i in range(num_vlans): - for j in range(num_devs_per_vlan): - cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', - '-R', vlan_filter[i] + - ' && (arp.opcode == 1)' - ' && (arp.src.proto_ipv4 == 10.1.2.' - +str(101+j)+')' - ' && (arp.dst.proto_ipv4 == 10.1.2.1)']) - print(cap_pkts) - assert cap_pkts.count('\n') == 0 + for j in range(len(ip_ver)): + if ip_ver[j] == 4: + filter = vlan_filter[i] + ' && (arp.opcode == 1)' \ + ' && (arp.src.proto_ipv4 == 10.1.2.)' \ + ' && (arp.dst.proto_ipv4 == 10.1.2.1)' \ + ' && !expert.severity' + elif ip_ver[j] == 6: + filter = vlan_filter[i] + ' && (icmpv6.type == 135)' \ + ' && (ipv6.src == 1234:2::)' \ + ' && (icmpv6.nd.ns.target_address == 1234:2::1)' \ + ' && !expert.severity' + for k in range(num_devs_per_vlan): + if ip_ver[j] == 4: + filter = filter.replace('', str(101+k)) + elif ip_ver[j] == 6: + filter = filter.replace('', format(0x65+k, 'x')) + print filter + cap_pkts = subprocess.check_output([tshark, '-nr', + 'capture.pcap', '-Y', filter]) + print(cap_pkts) + assert cap_pkts.count('\n') == 0 - # retrieve and verify ARP Table on tx/rx ports - log.info('retrieving ARP entries on tx port') + # retrieve and verify ARP/NDP Table on tx/rx ports + log.info('retrieving ARP/NDP entries on tx port') device_list = drone.getDeviceList(emul_ports.port_id[0]) device_config = device_list.Extensions[emul.port_device] neigh_list = drone.getDeviceNeighbors(emul_ports.port_id[0]) devices = neigh_list.Extensions[emul.devices] - log.info('ARP Table on tx port') + log.info('ARP/NDP Table on tx port') for dev_cfg, device in zip(device_config, devices): - resolved = False - for arp in device.arp: - # TODO: print all configured vlans, not just the first - # TODO: pretty print ip and mac - print('v%d|%08x: %08x %012x' % - (dev_cfg.vlan[0] & 0xffff, dev_cfg.ip4, arp.ip4, arp.mac)) - if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac): - resolved = True - assert resolved + vlans = '' + for v in dev_cfg.vlan: + vlans += str(v & 0xffff) + ' ' + if has_ip4: + resolved = False + for arp in device.arp: + print('%s%s: %s %012x' % + (vlans, str(ipaddress.ip_address(dev_cfg.ip4)), + str(ipaddress.ip_address(arp.ip4)), arp.mac)) + if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac): + resolved = True + assert resolved + if has_ip6: + resolved = False + for ndp in device.ndp: + print('%s%s: %s %012x' % + (vlans, str(ip6_address(dev_cfg.ip6)), + str(ip6_address(ndp.ip6)), ndp.mac)) + if (ndp.ip6 == dev_cfg.ip6_default_gateway) and (ndp.mac): + resolved = True + assert resolved log.info('retrieving ARP entries on rx port') device_list = drone.getDeviceList(emul_ports.port_id[0]) device_config = device_list.Extensions[emul.port_device] neigh_list = drone.getDeviceNeighbors(emul_ports.port_id[1]) devices = neigh_list.Extensions[emul.devices] - log.info('ARP Table on rx port') + log.info('ARP/NDP Table on rx port') for dev_cfg, device in zip(device_config, devices): - # verify *no* ARPs learnt on rx port - assert len(device.arp) == 0 - for arp in device.arp: - # TODO: pretty print ip and mac - print('v%d|%08x: %08x %012x' % - (dev_cfg.vlan[0] & 0xffff, dev_cfg.ip4, arp.ip4, arp.mac)) + vlans = '' + for v in dev_cfg.vlan: + vlans += str(v & 0xffff) + ' ' + # verify *no* ARPs/NDPs learnt on rx port + if has_ip4: + for arp in device.arp: + print('%s%s: %s %012x' % + (vlans, str(ipaddress.ip_address(dev_cfg.ip4)), + str(ipaddress.ip_address(arp.ip4)), arp.mac)) + assert len(device.arp) == 0 + if has_ip6: + for ndp in device.ndp: + print('%s%s: %s %012x' % + (vlans, str(ip6_address(dev_cfg.ip6)), + str(ip6_address(ndp.ip6)), ndp.mac)) + assert len(device.ndp) == 0 # ping the tx devices from the DUT for i in range(num_vlans): vrf = 'vrf' + str(i+1) for j in range(num_devs_per_vlan): - out = sudo('ip netns exec ' + vrf - + ' ping -c3 10.1.1.'+str(101+j), warn_only=True) - assert '100% packet loss' not in out + if has_ip4: + out = sudo('ip netns exec ' + vrf + + ' ping -c3 10.1.1.'+str(101+j), warn_only=True) + assert '100% packet loss' not in out + if has_ip6: + out = sudo('ip netns exec ' + vrf + + ' ping -6 -c3 1234:1::'+format(101+j, 'x'), + warn_only=True) + assert '100% packet loss' not in out # ping the rx devices from the DUT for i in range(num_vlans): vrf = 'vrf' + str(i+1) for j in range(num_devs_per_vlan): - out = sudo('ip netns exec ' + vrf - + ' ping -c3 10.1.2.'+str(101+j), warn_only=True) - assert '100% packet loss' not in out + if has_ip4: + out = sudo('ip netns exec ' + vrf + + ' ping -c3 10.1.2.'+str(101+j), warn_only=True) + assert '100% packet loss' not in out + if has_ip6: + out = sudo('ip netns exec ' + vrf + + ' ping -6 -c3 1234:2::'+format(101+j, 'x'), + warn_only=True) + assert '100% packet loss' not in out # we are all set - send data stream(s) drone.startCapture(ports.rx) @@ -1017,22 +1130,34 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports, log.info('dumping Rx capture buffer (all)') cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap']) print(cap_pkts) - log.info('dumping Tx capture buffer (all pkts - vlans only)') + log.info('dumping Rx capture buffer (all pkts - vlans only)') cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap', '-Tfields', '-eframe.number', '-eieee8021ad.id', '-evlan.id']) print(cap_pkts) log.info('dumping Rx capture buffer (filtered)') for i in range(num_vlans): for j in range(num_devs_per_vlan): - cap_pkts = subprocess.check_output( - [tshark, '-nr', 'capture.pcap', - '-R', vlan_filter[i] + - ' && (ip.src == 10.1.1.' + str(101+j) + ') ' - ' && (ip.dst == 10.1.2.' + str(101+j) + ')' - ' && (eth.dst == 00:01:02:03:0b:' - + format(1+j, '02x')+')']) - print(cap_pkts) - assert cap_pkts.count('\n') == 1 + if has_ip4: + filter = vlan_filter[i] + \ + ' && (ip.src == 10.1.1.' + str(101+j) + ')' \ + ' && (ip.dst == 10.1.2.' + str(101+j) + ')' \ + ' && (eth.dst == 00:01:02:03:0b:' + format(1+j, '02x')+')' + print('filter: %s' % filter) + cap_pkts = subprocess.check_output([tshark, '-nr', + 'capture.pcap', '-Y', filter]) + print(cap_pkts) + assert cap_pkts.count('\n') == 1 + if has_ip6: + filter = vlan_filter[i] \ + + ' && (ipv6.src == 1234:1::' + format(101+j, 'x') + ')' \ + + ' && (ipv6.dst == 1234:2::' + format(101+j, 'x') + ')' \ + + ' && (eth.dst == 00:01:02:03:0b:' \ + + format(1+j, '02x') + ')' + print('filter: %s' % filter) + cap_pkts = subprocess.check_output([tshark, '-nr', + 'capture.pcap', '-Y', filter]) + print(cap_pkts) + assert cap_pkts.count('\n') == 1 os.remove('capture.pcap') import pytest