sign: post tx stream stats collection; testing pending

This commit is contained in:
Srivats P 2016-11-13 19:11:24 +05:30
parent afcb4126b5
commit f8575ef101
8 changed files with 170 additions and 36 deletions

View File

@ -67,3 +67,35 @@ class DroneProxy(object):
os.fsync(f.fileno())
f.close()
def getStreamStatsDict(self, stream_guid_list): # FIXME: rename?
"""
Convenience wrapper method for getStreamStats which returns StreamStats
as a dictionary instead of a List
TODO: document dictionary structure
"""
class StreamStatsDict:
def __repr__(self):
s = 'port: {\n'
for k, v in self.port.items():
s += str(k) + ': {' + str(v) + '}\n'
s += '}'
return s
class StreamStatsDictPort:
def __repr__(self):
s = 'sguid: { '
for k, v in self.sguid.items():
s += str(k) + ': {' + str(v).replace('\n', ' ') + '}\n'
s += '}'
return s
ssl = self.getStreamStats(stream_guid_list)
ssd = StreamStatsDict()
# TODO: ssd.aggr = dict()
ssd.port = dict()
for ss in ssl.stream_stats:
if ss.port_id.id not in ssd.port:
ssd.port[ss.port_id.id] = StreamStatsDictPort()
# TODO: ssd.port[ss.port_id.id].aggr = dict()
ssd.port[ss.port_id.id].sguid = dict()
assert ss.stream_guid.id not in ssd.port[ss.port_id.id].sguid
ssd.port[ss.port_id.id].sguid[ss.stream_guid.id] = ss
return ssd

View File

@ -277,8 +277,8 @@ message StreamGuidList {
}
message StreamStats {
required StreamGuid stream_guid = 1;
required PortId port_id = 2; // FIXME: change to optional - reuse for aggr?
required PortId port_id = 1;
required StreamGuid stream_guid = 2;
optional uint64 rx_pkts = 11;
optional uint64 rx_bytes = 12;

View File

@ -195,4 +195,20 @@ _exit:
return isOk;
}
bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid)
{
const uchar *p = pkt + pktLen - sizeof(kSignMagic);
quint32 magic = qFromBigEndian<quint32>(p);
if (magic != kSignMagic)
return false;
p--;
while (*p != kTypeLenEnd) {
if (*p == kTypeLenGuid) {
*guid = qFromBigEndian<quint32>(p - 3) >> 8;
return true;
}
p -= 1 + (*p >> 5); // move to next TLV
}
return false;
}

View File

@ -81,6 +81,7 @@ public:
virtual bool setFieldData(int index, const QVariant &value,
FieldAttrib attrib = FieldValue);
static bool packetGuid(const uchar *pkt, int pktLen, uint *guid);
private:
static const quint32 kSignMagic = 0xa1b2c3d4; // FIXME
static const quint8 kTypeLenEnd = 0x00;

View File

@ -21,6 +21,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#define _PACKET_SEQUENCE_H
#include "pcapextra.h"
#include "../common/sign.h"
#include "streamstats.h"
class PacketSequence
{
@ -46,6 +48,7 @@ public:
}
int appendPacket(const struct pcap_pkthdr *pktHeader,
const uchar *pktData) {
int ret;
if (lastPacket_)
{
usecDuration_ += (pktHeader->ts.tv_sec
@ -57,7 +60,15 @@ public:
bytes_ += pktHeader->caplen;
lastPacket_ = (struct pcap_pkthdr *)
(sendQueue_->buffer + sendQueue_->len);
return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData);
ret = pcap_sendqueue_queue(sendQueue_, pktHeader, pktData);
if (ret >= 0) {
uint guid;
if (SignProtocol::packetGuid(pktData, pktHeader->caplen, &guid)) {
streamStatsMeta_[guid].tx_pkts++;
streamStatsMeta_[guid].tx_bytes += pktHeader->caplen;
}
}
return ret;
}
pcap_send_queue *sendQueue_;
struct pcap_pkthdr *lastPacket_;
@ -67,6 +78,7 @@ public:
int repeatCount_;
int repeatSize_;
long usecDelay_;
StreamStats streamStatsMeta_;
};
#endif

View File

@ -247,6 +247,8 @@ void PcapTxThread::run()
packetSequenceList_.at(i)->usecDuration_);
}
lastStats_ = *stats_; // used for stream stats
state_ = kRunning;
i = 0;
while (i < packetSequenceList_.size())
@ -335,10 +337,7 @@ _restart:
}
_exit:
// TODO: update stream stats
// FIXME: temporary data for testing
streamStats_[1001].tx_pkts = 12345;
streamStats_[1001].tx_bytes = 56789;
updateStreamStats();
state_ = kFinished;
}
@ -423,7 +422,7 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p,
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + pktLen);
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr));
pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); // FIXME: superfluous?
if (stop_)
{
@ -434,6 +433,68 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p,
return 0;
}
void PcapTxThread::updateStreamStats()
{
// Number of tx packets sent during last transmit
quint64 pkts = stats_->pkts > lastStats_.pkts ?
stats_->pkts - lastStats_.pkts :
stats_->pkts + (ULLONG_MAX - lastStats_.pkts);
// Calculate -
// number of (complete) repeats of packetList_
// - this is same as number of repeats of each PacketSequence
// number of pkts sent in last partial repeat of packetList_
// - This encompasses 0 or more potentially partial PacketSequence
// FIXME: incorrect, we need to consider the expandedPacketCount_ i.e.
// the configured repeats of each sequence ('n')
int c = pkts/packetCount_;
int d = pkts%packetCount_;
foreach(PacketSequence *seq, packetSequenceList_) {
uint a = c;
if (d >= seq->packets_) {
// last repeat of this seq was complete - so just bump up the
// repeat count and reduce the full seq pkt count from the
// last repeat pkt sent count
// NOTE: if the packet sent count is an exact multiple of
// packetList_ count, then d will be 0 and we will never come
// inside this condition
Q_ASSERT(seq->packets_);
a = c + 1;
d -= seq->packets_;
}
else if (d) { // (d < seq->packets_)
// last repeat of this seq was partial, so we need to traverse
// this sequence upto 'd' pkts to update streamStats
struct pcap_pkthdr *hdr =
(struct pcap_pkthdr*) seq->sendQueue_->buffer;
char *end = seq->sendQueue_->buffer + seq->sendQueue_->len;
while(d && ((char*) hdr < end)) {
uchar *pkt = (uchar*)hdr + sizeof(*hdr);
uint guid;
if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) {
streamStats_[guid].tx_pkts++;
streamStats_[guid].tx_bytes += hdr->caplen;
}
// Step to the next packet in the buffer
hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen);
d--;
}
}
StreamStatsIterator i(seq->streamStatsMeta_);
while (i.hasNext()) {
i.next();
uint guid = i.key();
StreamStatsTuple ssm = i.value();
streamStats_[guid].tx_pkts += a * ssm.tx_pkts;
streamStats_[guid].tx_bytes += a * ssm.tx_bytes;
}
}
}
void PcapTxThread::udelay(unsigned long usec)
{
#if defined(Q_OS_WIN32)

View File

@ -22,12 +22,11 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractport.h"
#include "packetsequence.h"
#include "statstuple.h"
#include <QThread>
#include <pcap.h>
class StatsTuple;
class PcapTxThread: public QThread
{
public:
@ -67,6 +66,7 @@ private:
static void udelay(unsigned long usec);
int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead,
int sync);
void updateStreamStats();
QList<PacketSequence*> packetSequenceList_;
PacketSequence *currentPacketSequence_;
@ -85,6 +85,7 @@ private:
volatile State state_;
StatsTuple *stats_;
StatsTuple lastStats_;
StreamStats streamStats_;
};

View File

@ -126,8 +126,8 @@ def ports(request, drone):
assert len(port_config_list.port) != 0
# print port list and find default X/Y ports
port_x_num = -1
port_y_num = -1
ports.x_num = -1
ports.y_num = -1
print port_config_list
print('Port List')
print('---------')
@ -135,32 +135,32 @@ def ports(request, drone):
print('%d.%s (%s)' % (port.port_id.id, port.name, port.description))
# use a vhost port as default X/Y port
if ('vhost' in port.name or 'sun' in port.description.lower()):
if port_x_num < 0:
port_x_num = port.port_id.id
elif port_y_num < 0:
port_y_num = port.port_id.id
if ports.x_num < 0:
ports.x_num = port.port_id.id
elif ports.y_num < 0:
ports.y_num = port.port_id.id
if ('eth1' in port.name):
port_x_num = port.port_id.id
ports.x_num = port.port_id.id
if ('eth2' in port.name):
port_y_num = port.port_id.id
ports.y_num = port.port_id.id
assert port_x_num >= 0
assert port_y_num >= 0
assert ports.x_num >= 0
assert ports.y_num >= 0
print('Using port %d as port X' % port_x_num)
print('Using port %d as port Y' % port_y_num)
print('Using port %d as port X' % ports.x_num)
print('Using port %d as port Y' % ports.y_num)
ports.x = ost_pb.PortIdList()
ports.x.port_id.add().id = port_x_num;
ports.x.port_id.add().id = ports.x_num;
ports.y = ost_pb.PortIdList()
ports.y.port_id.add().id = port_y_num;
ports.y.port_id.add().id = ports.y_num;
# Enable stream stats on ports
portConfig = ost_pb.PortConfigList()
portConfig.port.add().port_id.id = port_x_num;
portConfig.port.add().port_id.id = ports.x_num;
portConfig.port[0].stream_stats_tracking = True;
portConfig.port.add().port_id.id = port_y_num;
portConfig.port.add().port_id.id = ports.y_num;
portConfig.port[1].stream_stats_tracking = True;
print('Enabling Stream Stats tracking on ports X and Y');
drone.modifyPort(portConfig);
@ -352,7 +352,6 @@ def stream_guids(request, drone, ports):
stream_guids = ost_pb.StreamGuidList()
stream_guids.port_id_list.port_id.add().id = ports.x.port_id[0].id;
stream_guids.port_id_list.port_id.add().id = ports.y.port_id[0].id;
stream_guids.stream_guid.add().id = 101
return stream_guids
"""
@ -413,15 +412,22 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
/.101 \.101
HostX HostY
"""
npkts = 14
log.info('configuring stream %d on port X' % stream.stream[0].stream_id.id)
stream.stream[1].control.num_packets = npkts
nbytes = npkts * (stream.stream[1].core.frame_len - 4)
drone.modifyStream(stream)
# clear port X/Y stats
log.info('clearing stats')
drone.clearStats(ports.x)
drone.clearStats(ports.y)
drone.clearStreamStats(ports.x)
drone.clearStreamStats(ports.y)
drone.clearStreamStats(stream_guids)
# verify stream strats are indeed cleared
ssd = drone.getStreamStatsDict(stream_guids)
assert len(ssd.port) == 0
# resolve ARP/NDP on ports X/Y
log.info('resolving Neighbors on (X, Y) ports ...')
@ -460,16 +466,18 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap',
'-Y', filter])
print(cap_pkts)
assert cap_pkts.count('\n') == 10
assert cap_pkts.count('\n') == npkts
os.remove('capture.pcap')
# verify stream stats
stream_guids.ClearField("stream_guid");
stream_stats_list = drone.getStreamStats(stream_guids)
log.info('--> (stream_stats)' + stream_stats_list.__str__())
assert len(stream_stats_list.stream_stats) == 2
# FIXME: verify stream stats
ssd = drone.getStreamStatsDict(stream_guids)
log.info('--> (stream stats)\n' + str(ssd))
# FIXME (temp): assert len(ssd.port) == 2
assert len(ssd.port[ports.x_num].sguid) == 1
assert ssd.port[ports.x_num].sguid[101].tx_pkts == npkts
assert ssd.port[ports.x_num].sguid[101].tx_bytes == nbytes
assert len(ssd.port[ports.y_num].sguid) == 1
# TODO: rx = tx
except RpcError as e:
raise
@ -484,4 +492,7 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
# * Verify that bi-directional stream stats are correct for multiple streams
# * Verify protocol combinations - Eth, IPv4/IPv6, TCP/UDP, Pattern
# * Verify transmit modes
# * Verify tx pkts = multiple repeats of packetList + partial repeat
# * Verify tx pkts = multiple complete repeats of packetList + no partials
# * Verify tx pkts = first seq partial
#