Device Emulation (contd.): Enhanced the VLAN Devices test case to include IPv6 (and dual stack) scenario(s)

This commit is contained in:
Srivats P 2016-01-23 21:01:47 +05:30
parent efe22149e1
commit 8efdb44e6a

View File

@ -260,6 +260,8 @@ def dut_vlans(request, dut_ports):
for i in range(len(devices.rx)):
vrf = 'vrf' + str(i+1)
sudo('ip netns add ' + vrf)
sudo('ip netns exec ' + vrf
+ ' sysctl -w net.ipv6.conf.all.forwarding=1')
dev = devices.rx[i]
sudo('ip link set ' + dev
@ -267,6 +269,9 @@ def dut_vlans(request, dut_ports):
sudo('ip netns exec ' + vrf
+ ' ip addr add 10.1.1.1/24'
+ ' dev ' + dev)
sudo('ip netns exec ' + vrf
+ ' ip -6 addr add 1234:1::1/96'
+ ' dev ' + dev)
sudo('ip netns exec ' + vrf
+ ' ip link set ' + dev + ' up')
@ -276,6 +281,9 @@ def dut_vlans(request, dut_ports):
sudo('ip netns exec ' + vrf
+ ' ip addr add 10.1.2.1/24'
+ ' dev ' + dev)
sudo('ip netns exec ' + vrf
+ ' ip -6 addr add 1234:2::1/96'
+ ' dev ' + dev)
sudo('ip netns exec ' + vrf
+ ' ip link set ' + dev + ' up')
@ -625,8 +633,8 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
devices = neigh_list.Extensions[emul.devices]
log.info('ARP/NDP Table on tx port')
for dev_cfg, device in zip(device_config, devices):
resolved = False
if has_ip4:
resolved = False
for arp in device.arp:
print('%s: %s %012x' %
(str(ipaddress.ip_address(dev_cfg.ip4)),
@ -636,6 +644,7 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
resolved = True
assert resolved
if has_ip6:
resolved = False
for ndp in device.ndp:
print('%s: %s %012x' %
(str(ip6_address(dev_cfg.ip6)),
@ -654,19 +663,17 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
for dev_cfg, device in zip(device_config, devices):
# verify *no* ARPs/NDPs learnt on rx port
if has_ip4:
assert len(device.arp) == 0
for arp in device.arp:
print('%s: %s %012x' %
(str(ipaddress.ip_address(dev_cfg.ip4)),
str(ipaddress.ip_address(arp.ip4)),
arp.mac))
str(ipaddress.ip_address(arp.ip4)), arp.mac))
assert len(device.arp) == 0
if has_ip6:
assert len(device.ndp) == 0
for ndp in device.ndp:
print('%s: %s %012x' %
(str(ip6_address(dev_cfg.ip6)),
str(ip6_address(ndp.ip6)),
ndp.mac))
str(ip6_address(ndp.ip6)), ndp.mac))
assert len(device.ndp) == 0
# ping the tx devices from the DUT
for i in range(num_devs):
@ -730,26 +737,26 @@ def test_multiEmulDevNoVlan(drone, ports, dut, dut_ports, dut_ip,
drone.stopTransmit(ports.tx)
run('ip neigh show')
@pytest.mark.parametrize('vlan_cfg', [
[{'base': 11, 'count': 5}],
@pytest.mark.parametrize('vlan_cfg,ip_ver', [
([{'base': 11, 'count': 5}], [6]),
[{'base': 11, 'count': 2},
{'base': 21, 'count': 3}],
([{'base': 11, 'count': 2},
{'base': 21, 'count': 3}], [4]),
[{'base': 11, 'count': 2, 'tpid': 0x88a8},
{'base': 21, 'count': 3}],
([{'base': 11, 'count': 2, 'tpid': 0x88a8},
{'base': 21, 'count': 3}], [4, 6]),
[{'base': 11, 'count': 2},
([{'base': 11, 'count': 2},
{'base': 21, 'count': 3, 'step': 2},
{'base': 31, 'count': 2, 'step': 3}],
{'base': 31, 'count': 2, 'step': 3}], [6]),
[{'base': 11, 'count': 2},
([{'base': 11, 'count': 2},
{'base': 21, 'count': 3},
{'base': 31, 'count': 2},
{'base': 1, 'count': 3}],
{'base': 1, 'count': 3}], [4])
])
def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
stream_clear, emul_ports, dgid_list, vlan_cfg):
stream_clear, emul_ports, dgid_list, vlan_cfg, ip_ver):
# ----------------------------------------------------------------- #
# TESTCASE: Emulate multiple IPv4 device per multiple single-tag VLANs
#
@ -757,6 +764,7 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
# .1/ \.1
# / \
# 10.1.1/24 10.1.2/24
# 1234:1/96 1234:2/96
# [vlans] [vlans]
# / \
# /.101-103 \.101-103
@ -768,8 +776,14 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
num_vlans = num_vlans * vcfg['count']
test_multiEmulDevPerVlan.vlan_cfg = vlan_cfg
num_devs_per_vlan = 3
tx_ip_base = 0x0a010165
rx_ip_base = 0x0a010265
has_ip4 = True if 4 in ip_ver else False
has_ip6 = True if 6 in ip_ver else False
if has_ip4:
tx_ip4_base = 0x0a010165
rx_ip4_base = 0x0a010265
if has_ip6:
tx_ip6_base = ip6_address('1234:1::65/96')
rx_ip6_base = ip6_address('1234:2::65/96')
# configure vlans on the DUT
dut_vlans(request, dut_ports)
@ -806,10 +820,16 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
v.step = vcfg['step']
dg.device_count = num_devs_per_vlan
dg.Extensions[emul.mac].address = 0x000102030a01
ip = dg.Extensions[emul.ip4]
ip.address = tx_ip_base
ip.prefix_length = 24
ip.default_gateway = (tx_ip_base & 0xffffff00) | 0x01
if has_ip4:
ip = dg.Extensions[emul.ip4]
ip.address = tx_ip4_base
ip.prefix_length = 24
ip.default_gateway = (tx_ip4_base & 0xffffff00) | 0x01
if has_ip6:
ip = dg.Extensions[emul.ip6]
ip.address.CopyFrom(tx_ip6_base.ip6)
ip.prefix_length = tx_ip6_base.prefixlen
ip.default_gateway.CopyFrom(tx_ip6_base.gateway)
drone.modifyDeviceGroup(devgrp_cfg)
@ -829,71 +849,100 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
v.step = vcfg['step']
dg.device_count = num_devs_per_vlan
dg.Extensions[emul.mac].address = 0x000102030b01
ip = dg.Extensions[emul.ip4]
ip.address = rx_ip_base
ip.prefix_length = 24
ip.default_gateway = (rx_ip_base & 0xffffff00) | 0x01
if has_ip4:
ip = dg.Extensions[emul.ip4]
ip.address = rx_ip4_base
ip.prefix_length = 24
ip.default_gateway = (rx_ip4_base & 0xffffff00) | 0x01
if has_ip6:
ip = dg.Extensions[emul.ip6]
ip.address.CopyFrom(rx_ip6_base.ip6)
ip.prefix_length = rx_ip6_base.prefixlen
ip.default_gateway.CopyFrom(rx_ip6_base.gateway)
drone.modifyDeviceGroup(devgrp_cfg)
# add the tx stream(s) - we need more than one stream
stream_id = ost_pb.StreamIdList()
stream_id.port_id.CopyFrom(ports.tx.port_id[0])
for i in range(num_vlans):
for i in range(num_vlans * len(ip_ver)):
stream_id.stream_id.add().id = i
log.info('adding tx_stream %d' % stream_id.stream_id[i].id)
drone.addStream(stream_id)
# configure the tx stream(s)
stream_cfg = ost_pb.StreamConfigList()
stream_cfg.port_id.CopyFrom(ports.tx.port_id[0])
for i in range(num_vlans):
s = stream_cfg.stream.add()
s.stream_id.id = stream_id.stream_id[i].id
s.core.name = 'stream ' + str(s.stream_id.id)
s.core.is_enabled = True
s.core.ordinal = i
s.control.packets_per_sec = 100
s.control.num_packets = num_devs_per_vlan
for j in range(len(ip_ver)):
idx = i*len(ip_ver) + j
s = stream_cfg.stream.add()
s.stream_id.id = stream_id.stream_id[idx].id
s.core.name = 'stream ' + str(s.stream_id.id)
s.core.is_enabled = True
s.core.ordinal = idx
s.core.frame_len = 1024
s.control.packets_per_sec = 100
s.control.num_packets = num_devs_per_vlan
# setup stream protocols as mac:vlan:eth2:ip4:udp:payload
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber
p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve
p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve
ids = dut_vlans.vlans[i].split('.')
for id, j in zip(ids, range(len(ids))):
# setup stream protocols as mac:vlan:eth2:ip4:udp:payload
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kVlanFieldNumber
p.Extensions[vlan].vlan_tag = int(id)
if 'tpid' in vlan_cfg[j]:
p.Extensions[vlan].tpid = vlan_cfg[j]['tpid']
p.Extensions[vlan].is_override_tpid = True
p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber
p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve
p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber
ids = dut_vlans.vlans[i].split('.')
for id, idx in zip(ids, range(len(ids))):
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kVlanFieldNumber
p.Extensions[vlan].vlan_tag = int(id)
if 'tpid' in vlan_cfg[idx]:
p.Extensions[vlan].tpid = vlan_cfg[idx]['tpid']
p.Extensions[vlan].is_override_tpid = True
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber
ip = p.Extensions[ip4]
ip.src_ip = tx_ip_base
ip.src_ip_mode = Ip4.e_im_inc_host
ip.src_ip_count = num_devs_per_vlan
ip.dst_ip = rx_ip_base
ip.dst_ip_mode = Ip4.e_im_inc_host
ip.dst_ip_count = num_devs_per_vlan
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kEth2FieldNumber
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kUdpFieldNumber
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber
if ip_ver[j] == 4:
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kIp4FieldNumber
ip = p.Extensions[ip4]
ip.src_ip = tx_ip4_base
ip.src_ip_mode = Ip4.e_im_inc_host
ip.src_ip_count = num_devs_per_vlan
ip.dst_ip = rx_ip4_base
ip.dst_ip_mode = Ip4.e_im_inc_host
ip.dst_ip_count = num_devs_per_vlan
elif ip_ver[j] == 6:
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kIp6FieldNumber
ip = p.Extensions[ip6]
log.info('configuring tx_stream %d' % stream_id.stream_id[i].id)
ip.src_addr_hi = tx_ip6_base.ip6.hi
ip.src_addr_lo = tx_ip6_base.ip6.lo
ip.src_addr_mode = Ip6.kIncHost
ip.src_addr_count = num_devs_per_vlan
ip.src_addr_prefix = tx_ip6_base.prefixlen
ip.dst_addr_hi = rx_ip6_base.ip6.hi
ip.dst_addr_lo = rx_ip6_base.ip6.lo
ip.dst_addr_mode = Ip6.kIncHost
ip.dst_addr_count = num_devs_per_vlan
ip.dst_addr_prefix = rx_ip6_base.prefixlen
else:
assert false # unreachable
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kUdpFieldNumber
p = s.protocol.add()
p.protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber
log.info('configuring tx_stream %d' % stream_id.stream_id[idx].id)
drone.modifyStream(stream_cfg)
# clear arp on DUT
# clear arp/ndp on DUT
for i in range(num_vlans):
vrf = 'vrf' + str(i+1)
@ -902,8 +951,14 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
arp_cache = sudo('ip netns exec ' + vrf
+ ' ip neigh show')
assert re.search('10.1.[1-2].20[1-5].*lladdr', arp_cache) == None
assert re.search('1234:[1-2]::\[\da-f]+.*lladdr', arp_cache) == None
# resolve ARP on tx/rx ports
# wait for interface to do DAD? Otherwise we don't get replies for NS
# FIXME: find alternative to sleep
if has_ip6:
time.sleep(5)
# resolve ARP/NDP on tx/rx ports
log.info('resolving Neighbors on tx/rx ports ...')
drone.startCapture(emul_ports)
drone.clearDeviceNeighbors(emul_ports)
@ -911,7 +966,7 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
time.sleep(3)
drone.stopCapture(emul_ports)
# verify ARP Requests sent out from tx port
# verify ARP/NDP Requests sent out from tx port
buff = drone.getCaptureBuffer(emul_ports.port_id[0])
drone.saveCaptureBuffer(buff, 'capture.pcap')
log.info('dumping Tx capture buffer (all)')
@ -923,18 +978,30 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
print(cap_pkts)
log.info('dumping Tx capture buffer (filtered)')
for i in range(num_vlans):
for j in range(num_devs_per_vlan):
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-R', vlan_filter[i] +
' && (arp.opcode == 1)'
' && (arp.src.proto_ipv4 == 10.1.1.'
+str(101+j)+')'
' && (arp.dst.proto_ipv4 == 10.1.1.1)'])
print(cap_pkts)
assert cap_pkts.count('\n') == 1
for j in range(len(ip_ver)):
if ip_ver[j] == 4:
filter = vlan_filter[i] + ' && (arp.opcode == 1)' \
' && (arp.src.proto_ipv4 == 10.1.1.<x>)' \
' && (arp.dst.proto_ipv4 == 10.1.1.1)' \
' && !expert.severity'
elif ip_ver[j] == 6:
filter = vlan_filter[i] + ' && (icmpv6.type == 135)' \
' && (ipv6.src == 1234:1::<x>)' \
' && (icmpv6.nd.ns.target_address == 1234:1::1)' \
' && !expert.severity'
for k in range(num_devs_per_vlan):
if ip_ver[j] == 4:
filter = filter.replace('<x>', str(101+k))
elif ip_ver[j] == 6:
filter = filter.replace('<x>', format(0x65+k, 'x'))
print filter
cap_pkts = subprocess.check_output([tshark, '-nr',
'capture.pcap', '-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == 1
# verify *no* ARP Requests sent out from rx port
buff = drone.getCaptureBuffer(emul_ports.port_id[1])
# verify *no* ARP/NDP Requests sent out from rx port
buff = drone.getCaptureBuffer(emul_ports.port_id[0])
drone.saveCaptureBuffer(buff, 'capture.pcap')
log.info('dumping Rx capture buffer (all)')
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap'])
@ -945,63 +1012,109 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
print(cap_pkts)
log.info('dumping Rx capture buffer (filtered)')
for i in range(num_vlans):
for j in range(num_devs_per_vlan):
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-R', vlan_filter[i] +
' && (arp.opcode == 1)'
' && (arp.src.proto_ipv4 == 10.1.2.'
+str(101+j)+')'
' && (arp.dst.proto_ipv4 == 10.1.2.1)'])
print(cap_pkts)
assert cap_pkts.count('\n') == 0
for j in range(len(ip_ver)):
if ip_ver[j] == 4:
filter = vlan_filter[i] + ' && (arp.opcode == 1)' \
' && (arp.src.proto_ipv4 == 10.1.2.<x>)' \
' && (arp.dst.proto_ipv4 == 10.1.2.1)' \
' && !expert.severity'
elif ip_ver[j] == 6:
filter = vlan_filter[i] + ' && (icmpv6.type == 135)' \
' && (ipv6.src == 1234:2::<x>)' \
' && (icmpv6.nd.ns.target_address == 1234:2::1)' \
' && !expert.severity'
for k in range(num_devs_per_vlan):
if ip_ver[j] == 4:
filter = filter.replace('<x>', str(101+k))
elif ip_ver[j] == 6:
filter = filter.replace('<x>', format(0x65+k, 'x'))
print filter
cap_pkts = subprocess.check_output([tshark, '-nr',
'capture.pcap', '-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == 0
# retrieve and verify ARP Table on tx/rx ports
log.info('retrieving ARP entries on tx port')
# retrieve and verify ARP/NDP Table on tx/rx ports
log.info('retrieving ARP/NDP entries on tx port')
device_list = drone.getDeviceList(emul_ports.port_id[0])
device_config = device_list.Extensions[emul.port_device]
neigh_list = drone.getDeviceNeighbors(emul_ports.port_id[0])
devices = neigh_list.Extensions[emul.devices]
log.info('ARP Table on tx port')
log.info('ARP/NDP Table on tx port')
for dev_cfg, device in zip(device_config, devices):
resolved = False
for arp in device.arp:
# TODO: print all configured vlans, not just the first
# TODO: pretty print ip and mac
print('v%d|%08x: %08x %012x' %
(dev_cfg.vlan[0] & 0xffff, dev_cfg.ip4, arp.ip4, arp.mac))
if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac):
resolved = True
assert resolved
vlans = ''
for v in dev_cfg.vlan:
vlans += str(v & 0xffff) + ' '
if has_ip4:
resolved = False
for arp in device.arp:
print('%s%s: %s %012x' %
(vlans, str(ipaddress.ip_address(dev_cfg.ip4)),
str(ipaddress.ip_address(arp.ip4)), arp.mac))
if (arp.ip4 == dev_cfg.ip4_default_gateway) and (arp.mac):
resolved = True
assert resolved
if has_ip6:
resolved = False
for ndp in device.ndp:
print('%s%s: %s %012x' %
(vlans, str(ip6_address(dev_cfg.ip6)),
str(ip6_address(ndp.ip6)), ndp.mac))
if (ndp.ip6 == dev_cfg.ip6_default_gateway) and (ndp.mac):
resolved = True
assert resolved
log.info('retrieving ARP entries on rx port')
device_list = drone.getDeviceList(emul_ports.port_id[0])
device_config = device_list.Extensions[emul.port_device]
neigh_list = drone.getDeviceNeighbors(emul_ports.port_id[1])
devices = neigh_list.Extensions[emul.devices]
log.info('ARP Table on rx port')
log.info('ARP/NDP Table on rx port')
for dev_cfg, device in zip(device_config, devices):
# verify *no* ARPs learnt on rx port
assert len(device.arp) == 0
for arp in device.arp:
# TODO: pretty print ip and mac
print('v%d|%08x: %08x %012x' %
(dev_cfg.vlan[0] & 0xffff, dev_cfg.ip4, arp.ip4, arp.mac))
vlans = ''
for v in dev_cfg.vlan:
vlans += str(v & 0xffff) + ' '
# verify *no* ARPs/NDPs learnt on rx port
if has_ip4:
for arp in device.arp:
print('%s%s: %s %012x' %
(vlans, str(ipaddress.ip_address(dev_cfg.ip4)),
str(ipaddress.ip_address(arp.ip4)), arp.mac))
assert len(device.arp) == 0
if has_ip6:
for ndp in device.ndp:
print('%s%s: %s %012x' %
(vlans, str(ip6_address(dev_cfg.ip6)),
str(ip6_address(ndp.ip6)), ndp.mac))
assert len(device.ndp) == 0
# ping the tx devices from the DUT
for i in range(num_vlans):
vrf = 'vrf' + str(i+1)
for j in range(num_devs_per_vlan):
out = sudo('ip netns exec ' + vrf
+ ' ping -c3 10.1.1.'+str(101+j), warn_only=True)
assert '100% packet loss' not in out
if has_ip4:
out = sudo('ip netns exec ' + vrf
+ ' ping -c3 10.1.1.'+str(101+j), warn_only=True)
assert '100% packet loss' not in out
if has_ip6:
out = sudo('ip netns exec ' + vrf
+ ' ping -6 -c3 1234:1::'+format(101+j, 'x'),
warn_only=True)
assert '100% packet loss' not in out
# ping the rx devices from the DUT
for i in range(num_vlans):
vrf = 'vrf' + str(i+1)
for j in range(num_devs_per_vlan):
out = sudo('ip netns exec ' + vrf
+ ' ping -c3 10.1.2.'+str(101+j), warn_only=True)
assert '100% packet loss' not in out
if has_ip4:
out = sudo('ip netns exec ' + vrf
+ ' ping -c3 10.1.2.'+str(101+j), warn_only=True)
assert '100% packet loss' not in out
if has_ip6:
out = sudo('ip netns exec ' + vrf
+ ' ping -6 -c3 1234:2::'+format(101+j, 'x'),
warn_only=True)
assert '100% packet loss' not in out
# we are all set - send data stream(s)
drone.startCapture(ports.rx)
@ -1017,22 +1130,34 @@ def test_multiEmulDevPerVlan(request, drone, ports, dut, dut_ports,
log.info('dumping Rx capture buffer (all)')
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap'])
print(cap_pkts)
log.info('dumping Tx capture buffer (all pkts - vlans only)')
log.info('dumping Rx capture buffer (all pkts - vlans only)')
cap_pkts = subprocess.check_output([tshark, '-nr', 'capture.pcap',
'-Tfields', '-eframe.number', '-eieee8021ad.id', '-evlan.id'])
print(cap_pkts)
log.info('dumping Rx capture buffer (filtered)')
for i in range(num_vlans):
for j in range(num_devs_per_vlan):
cap_pkts = subprocess.check_output(
[tshark, '-nr', 'capture.pcap',
'-R', vlan_filter[i] +
' && (ip.src == 10.1.1.' + str(101+j) + ') '
' && (ip.dst == 10.1.2.' + str(101+j) + ')'
' && (eth.dst == 00:01:02:03:0b:'
+ format(1+j, '02x')+')'])
print(cap_pkts)
assert cap_pkts.count('\n') == 1
if has_ip4:
filter = vlan_filter[i] + \
' && (ip.src == 10.1.1.' + str(101+j) + ')' \
' && (ip.dst == 10.1.2.' + str(101+j) + ')' \
' && (eth.dst == 00:01:02:03:0b:' + format(1+j, '02x')+')'
print('filter: %s' % filter)
cap_pkts = subprocess.check_output([tshark, '-nr',
'capture.pcap', '-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == 1
if has_ip6:
filter = vlan_filter[i] \
+ ' && (ipv6.src == 1234:1::' + format(101+j, 'x') + ')' \
+ ' && (ipv6.dst == 1234:2::' + format(101+j, 'x') + ')' \
+ ' && (eth.dst == 00:01:02:03:0b:' \
+ format(1+j, '02x') + ')'
print('filter: %s' % filter)
cap_pkts = subprocess.check_output([tshark, '-nr',
'capture.pcap', '-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == 1
os.remove('capture.pcap')
import pytest