sign: fix the tx stream stats calculation and update associated test case

This commit is contained in:
Srivats P 2016-11-15 22:23:55 +05:30
parent f8575ef101
commit defdc218bd
3 changed files with 261 additions and 79 deletions

View File

@ -92,6 +92,7 @@ void PcapTxThread::clearPacketList()
repeatSize_ = 0;
packetCount_ = 0;
packetListSize_ = 0;
returnToQIdx_ = -1;
setPacketListLoopMode(false, 0, 0);
@ -153,6 +154,8 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec,
}
packetCount_++;
packetListSize_ += repeatSize_ ?
currentPacketSequence_->repeatCount_ : 1;
if (repeatSize_ > 0 && packetCount_ == repeatSize_)
{
qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu",
@ -441,58 +444,99 @@ void PcapTxThread::updateStreamStats()
stats_->pkts + (ULLONG_MAX - lastStats_.pkts);
// Calculate -
// number of (complete) repeats of packetList_
// - this is same as number of repeats of each PacketSequence
// number of complete repeats of packetList_
// => each PacketSet in the packetList is repeated these many times
// number of pkts sent in last partial repeat of packetList_
// - This encompasses 0 or more potentially partial PacketSequence
// FIXME: incorrect, we need to consider the expandedPacketCount_ i.e.
// the configured repeats of each sequence ('n')
int c = pkts/packetCount_;
int d = pkts%packetCount_;
foreach(PacketSequence *seq, packetSequenceList_) {
uint a = c;
if (d >= seq->packets_) {
// last repeat of this seq was complete - so just bump up the
// repeat count and reduce the full seq pkt count from the
// last repeat pkt sent count
// NOTE: if the packet sent count is an exact multiple of
// packetList_ count, then d will be 0 and we will never come
// inside this condition
Q_ASSERT(seq->packets_);
a = c + 1;
d -= seq->packets_;
}
else if (d) { // (d < seq->packets_)
// last repeat of this seq was partial, so we need to traverse
// this sequence upto 'd' pkts to update streamStats
struct pcap_pkthdr *hdr =
(struct pcap_pkthdr*) seq->sendQueue_->buffer;
char *end = seq->sendQueue_->buffer + seq->sendQueue_->len;
// - This encompasses 0 or more potentially partial PacketSets
// XXX: Note for the above, we consider a PacketSet to include its
// own repeats within itself
int c = pkts/packetListSize_;
int d = pkts%packetListSize_;
while(d && ((char*) hdr < end)) {
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
uint guid;
int i;
if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) {
streamStats_[guid].tx_pkts++;
streamStats_[guid].tx_bytes += hdr->caplen;
}
if (!c)
goto _last_repeat;
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen);
d--;
i = 0;
while (i < packetSequenceList_.size()) {
PacketSequence *seq = packetSequenceList_.at(i);
int rptSz = seq->repeatSize_;
int rptCnt = seq->repeatCount_;
for (int k = 0; k < rptSz; k++) {
seq = packetSequenceList_.at(i+k);
StreamStatsIterator iter(seq->streamStatsMeta_);
while (iter.hasNext()) {
iter.next();
uint guid = iter.key();
StreamStatsTuple ssm = iter.value();
streamStats_[guid].tx_pkts += c * rptCnt * ssm.tx_pkts;
streamStats_[guid].tx_bytes += c * rptCnt * ssm.tx_bytes;
}
}
StreamStatsIterator i(seq->streamStatsMeta_);
while (i.hasNext()) {
i.next();
uint guid = i.key();
StreamStatsTuple ssm = i.value();
streamStats_[guid].tx_pkts += a * ssm.tx_pkts;
streamStats_[guid].tx_bytes += a * ssm.tx_bytes;
}
// Move to the next Packet Set
i += rptSz;
}
_last_repeat:
if (!d)
goto _done;
i = 0;
while (i < packetSequenceList_.size()) {
PacketSequence *seq = packetSequenceList_.at(i);
int rptSz = seq->repeatSize_;
int rptCnt = seq->repeatCount_;
for (int j = 0; j < rptCnt; j++) {
for (int k = 0; k < rptSz; k++) {
seq = packetSequenceList_.at(i+k);
Q_ASSERT(seq->packets_);
if (d >= seq->packets_) {
// All packets of this seq were sent
StreamStatsIterator iter(seq->streamStatsMeta_);
while (iter.hasNext()) {
iter.next();
uint guid = iter.key();
StreamStatsTuple ssm = iter.value();
streamStats_[guid].tx_pkts += ssm.tx_pkts;
streamStats_[guid].tx_bytes += ssm.tx_bytes;
}
d -= seq->packets_;
}
else { // (d < seq->packets_)
// not all packets of this seq were sent, so we need to
// traverse this seq upto 'd' pkts, parse guid from the
// packet and update streamStats
struct pcap_pkthdr *hdr =
(struct pcap_pkthdr*) seq->sendQueue_->buffer;
char *end = seq->sendQueue_->buffer + seq->sendQueue_->len;
while(d && ((char*) hdr < end)) {
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
uint guid;
if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) {
streamStats_[guid].tx_pkts++;
streamStats_[guid].tx_bytes += hdr->caplen;
}
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen);
d--;
}
Q_ASSERT(d == 0);
goto _done;
}
}
}
// Move to the next Packet Set
i += rptSz;
}
_done:
return;
}
void PcapTxThread::udelay(unsigned long usec)

View File

@ -68,12 +68,15 @@ private:
int sync);
void updateStreamStats();
QList<PacketSequence*> packetSequenceList_;
// Intermediate state variables used while building the packet list
PacketSequence *currentPacketSequence_;
int repeatSequenceStart_;
quint64 repeatSize_;
quint64 packetCount_;
QList<PacketSequence*> packetSequenceList_;
quint64 packetListSize_; // count of pkts in packet List including repeats
int returnToQIdx_;
quint64 loopDelay_;

View File

@ -5,6 +5,7 @@ import ipaddress
import logging
import os
import pytest
import random
import subprocess
import sys
import time
@ -198,6 +199,7 @@ def dgid_list(request, drone, ports):
dg = devgrp_cfg.device_group.add()
dg.device_group_id.id = dgid_list.x.device_group_id[0].id
dg.core.name = "HostX"
dg.device_count = 5
dg.Extensions[emul.mac].address = 0x000102030a01
@ -290,16 +292,16 @@ def stream_clear(request, drone, ports):
sid_list = drone.getStreamIdList(ports.x.port_id[0])
drone.deleteStream(sid_list)
@pytest.fixture(scope='module')
def stream(request, drone, ports):
@pytest.fixture
def stream(request, drone, ports, sign_stream_cfg):
# add stream(s)
stream_id = ost_pb.StreamIdList()
stream_id.port_id.CopyFrom(ports.x.port_id[0])
stream_id.stream_id.add().id = 1 # Unsigned stream
stream_id.stream_id.add().id = 101 # Signed stream
log.info('adding X stream(s) %d %d' %
(stream_id.stream_id[0].id, stream_id.stream_id[1].id))
stream_id.stream_id.add().id = 1 # Unsigned stream
stream_id.stream_id.add().id = 2 # Signed stream
stream_id.stream_id.add().id = 3 # Signed stream
log.info('adding X stream(s) ' + str(stream_id))
drone.addStream(stream_id)
# configure the stream(s)
@ -308,11 +310,19 @@ def stream(request, drone, ports):
s = stream_cfg.stream.add()
s.stream_id.id = stream_id.stream_id[0].id
s.core.is_enabled = True
s.control.packets_per_sec = 100
s.control.num_packets = 10
# On Win32, packets in a seq that take less than 1s to transmit
# are transmitted in one shot using pcap_send_queue; this means that
# stopTransmit() cannot stop them in the middle of the sequence and
# stop will happen only at the pktSeq boundary
# Since we want to test cases where stop is trigerred in the middle of
# a sequence, we set pps < 1000
# FIXME: for some reason values such as 100pps is also leading to
# stop only at seq boundary - need to debug
s.control.packets_per_sec = 523
# setup (unsigned) stream protocols as mac:eth2:ip:payload
p = s.protocol.add()
s.control.num_packets = 10
p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber
p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve
p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve
@ -326,11 +336,16 @@ def stream(request, drone, ports):
s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber
# setup (signed) stream protocols as mac:eth2:ip:udp:payload
# setup (signed) stream protocols as mac:eth2:ip:udp|tcp:payload
# Remove payload, add udp, payload and sign protocol to signed stream(s)
sscfg = sign_stream_cfg
s = stream_cfg.stream.add()
s.CopyFrom(stream_cfg.stream[0])
s.stream_id.id = stream_id.stream_id[1].id
s.control.num_packets = sscfg['num_pkts'][0]
if sscfg['num_var_pkts'][0]:
s.protocol[2].Extensions[ip4].src_ip_mode = Ip4.e_im_inc_host
s.protocol[2].Extensions[ip4].src_ip_count = sscfg['num_var_pkts'][0]
del s.protocol[-1]
s.protocol.add().protocol_id.id = ost_pb.Protocol.kUdpFieldNumber
s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber
@ -338,9 +353,25 @@ def stream(request, drone, ports):
p.protocol_id.id = ost_pb.Protocol.kSignFieldNumber
p.Extensions[sign].stream_guid = 101
s = stream_cfg.stream.add()
s.CopyFrom(stream_cfg.stream[1])
s.stream_id.id = stream_id.stream_id[2].id
s.core.frame_len = 128
s.control.num_packets = sscfg['num_pkts'][1]
if sscfg['num_var_pkts'][1]:
s.protocol[2].Extensions[ip4].src_ip_mode = Ip4.e_im_inc_host
s.protocol[2].Extensions[ip4].src_ip_count = sscfg['num_var_pkts'][1]
s.protocol[-3].protocol_id.id = ost_pb.Protocol.kTcpFieldNumber
s.protocol[-1].Extensions[sign].stream_guid = 102
if sscfg['loop']:
s.control.next = ost_pb.StreamControl.e_nw_goto_id
drone.modifyStream(stream_cfg)
def fin():
# delete streams
log.info('deleting tx_stream %d' % stream_id.stream_id[0].id)
log.info('deleting tx_stream ' + str(stream_id))
drone.deleteStream(stream_id)
request.addfinalizer(fin)
@ -399,8 +430,13 @@ def stream_toggle_payload(request, drone, ports):
# ----------------------------------------------------------------- #
# ================================================================= #
@pytest.mark.parametrize('sign_stream_cfg', [
{'num_pkts': [270, 512], 'num_var_pkts': [0, 0], 'loop': False},
{'num_pkts': [276, 510], 'num_var_pkts': [0, 0], 'loop': True},
#{'num_pkts': [19, 30], 'num_var_pkts': [0, 0], 'loop': True},
])
def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
stream_clear, stream, stream_guids):
sign_stream_cfg, stream_clear, stream, stream_guids):
""" TESTCASE: Verify that uni-directional stream stats are correct for a
single signed stream X --> Y
DUT
@ -412,12 +448,10 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
/.101 \.101
HostX HostY
"""
npkts = 14
log.info('configuring stream %d on port X' % stream.stream[0].stream_id.id)
stream.stream[1].control.num_packets = npkts
nbytes = npkts * (stream.stream[1].core.frame_len - 4)
drone.modifyStream(stream)
# calculate tx pkts
total_tx_pkts = 10 + sign_stream_cfg['num_pkts'][0] \
+ sign_stream_cfg['num_pkts'][1]
# clear port X/Y stats
log.info('clearing stats')
drone.clearStats(ports.x)
@ -428,7 +462,6 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
ssd = drone.getStreamStatsDict(stream_guids)
assert len(ssd.port) == 0
# resolve ARP/NDP on ports X/Y
log.info('resolving Neighbors on (X, Y) ports ...')
drone.resolveDeviceNeighbors(emul_ports)
@ -438,46 +471,148 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
try:
drone.startCapture(ports.y)
drone.startCapture(ports.x)
time.sleep(1)
drone.startTransmit(ports.x)
log.info('waiting for transmit to finish ...')
time.sleep(3)
time.sleep(random.randint(3, 10))
drone.stopTransmit(ports.x)
time.sleep(1)
drone.stopCapture(ports.x)
drone.stopCapture(ports.y)
# verify port stats
x_stats = drone.getStats(ports.x)
log.info('--> (x_stats)' + x_stats.__str__())
assert(x_stats.port_stats[0].tx_pkts >= 20)
if not sign_stream_cfg['loop']:
assert(x_stats.port_stats[0].tx_pkts >= total_tx_pkts)
y_stats = drone.getStats(ports.y)
log.info('--> (y_stats)' + y_stats.__str__())
assert(y_stats.port_stats[0].rx_pkts >= 20)
if not sign_stream_cfg['loop']:
assert(y_stats.port_stats[0].rx_pkts >= total_tx_pkts)
ssd = drone.getStreamStatsDict(stream_guids)
# FIXME (temp): assert len(ssd.port) == 2
# dump X capture buffer
log.info('getting X capture buffer')
buff = drone.getCaptureBuffer(ports.x.port_id[0])
drone.saveCaptureBuffer(buff, 'capX.pcap')
log.info('dumping X capture buffer')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap'])
print(cap_pkts)
# get sign stream #1 Tx stats from capture
filter="frame[-9:9]==00.00.00.65.61.a1.b2.c3.d4"
print(filter)
log.info('dumping X capture buffer (filtered)')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap',
'-Y', filter])
print(cap_pkts)
sign_stream1_cnt = cap_pkts.count('\n')
# get sign stream #2 Tx stats from capture
filter="frame[-9:9]==00.00.00.66.61.a1.b2.c3.d4"
print(filter)
log.info('dumping Y capture buffer (filtered)')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap',
'-Y', filter])
print(cap_pkts)
sign_stream2_cnt = cap_pkts.count('\n')
os.remove('capX.pcap')
log.info('sign stream 101 tx cap count: %d' % (sign_stream1_cnt))
log.info('sign stream 102 tx cap count: %d' % (sign_stream2_cnt))
log.info('--> (stream stats)\n' + str(ssd))
# verify tx stream stats from drone is same as that from capture
assert len(ssd.port[ports.x_num].sguid) == 2
if sign_stream_cfg['loop']:
assert ssd.port[ports.x_num].sguid[101].tx_pkts \
== sign_stream1_cnt
assert ssd.port[ports.x_num].sguid[101].tx_bytes \
== (sign_stream1_cnt
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.x_num].sguid[102].tx_pkts \
== sign_stream2_cnt
assert ssd.port[ports.x_num].sguid[102].tx_bytes \
== (sign_stream2_cnt
* (stream.stream[2].core.frame_len - 4))
else:
assert ssd.port[ports.x_num].sguid[101].tx_pkts \
== sign_stream_cfg['num_pkts'][0]
assert ssd.port[ports.x_num].sguid[101].tx_bytes \
== (sign_stream_cfg['num_pkts'][0]
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.x_num].sguid[102].tx_pkts \
== sign_stream_cfg['num_pkts'][1]
assert ssd.port[ports.x_num].sguid[102].tx_bytes \
== (sign_stream_cfg['num_pkts'][1]
* (stream.stream[2].core.frame_len - 4))
# dump Y capture buffer
log.info('getting Y capture buffer')
buff = drone.getCaptureBuffer(ports.y.port_id[0])
drone.saveCaptureBuffer(buff, 'capture.pcap')
drone.saveCaptureBuffer(buff, 'capY.pcap')
log.info('dumping Y capture buffer')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap'])
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap'])
print(cap_pkts)
# get sign stream #1 Rx stats from capture
filter="frame[-9:9]==00.00.00.65.61.a1.b2.c3.d4"
print(filter)
log.info('dumping Y capture buffer (filtered)')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap',
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap',
'-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == npkts
os.remove('capture.pcap')
sign_stream1_cnt = cap_pkts.count('\n')
# verify stream stats
ssd = drone.getStreamStatsDict(stream_guids)
# get sign stream #2 Rx stats from capture
filter="frame[-9:9]==00.00.00.66.61.a1.b2.c3.d4"
print(filter)
log.info('dumping Y capture buffer (filtered)')
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap',
'-Y', filter])
print(cap_pkts)
sign_stream2_cnt = cap_pkts.count('\n')
os.remove('capY.pcap')
log.info('--> (x_stats)' + x_stats.__str__())
log.info('--> (y_stats)' + y_stats.__str__())
log.info('sign stream 101 rx cap count: %d' % (sign_stream1_cnt))
log.info('sign stream 102 rx cap count: %d' % (sign_stream2_cnt))
log.info('--> (stream stats)\n' + str(ssd))
# FIXME (temp): assert len(ssd.port) == 2
assert len(ssd.port[ports.x_num].sguid) == 1
assert ssd.port[ports.x_num].sguid[101].tx_pkts == npkts
assert ssd.port[ports.x_num].sguid[101].tx_bytes == nbytes
assert len(ssd.port[ports.y_num].sguid) == 1
# TODO: rx = tx
"""
# verify rx stream stats from drone is same as that from capture
assert len(ssd.port[ports.y_num].sguid) == 2
if sign_stream_cfg['loop']:
assert ssd.port[ports.y_num].sguid[101].tx_pkts \
== sign_stream1_cnt
assert ssd.port[ports.y_num].sguid[101].tx_bytes \
== (sign_stream1_cnt
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.y_num].sguid[102].tx_pkts \
== sign_stream2_cnt
assert ssd.port[ports.y_num].sguid[102].tx_bytes \
== (sign_stream2_cnt
* (stream.stream[2].core.frame_len - 4))
else:
assert ssd.port[ports.y_num].sguid[101].tx_pkts \
== sign_stream_cfg['num_pkts'][0]
assert ssd.port[ports.y_num].sguid[101].tx_bytes \
== (sign_stream_cfg['num_pkts'][0]
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.y_num].sguid[102].tx_pkts \
== sign_stream_cfg['num_pkts'][1]
assert ssd.port[ports.y_num].sguid[102].tx_bytes \
== (sign_stream_cfg['num_pkts'][1]
* (stream.stream[2].core.frame_len - 4))
"""
# TODO?: rx = tx
except RpcError as e:
raise