diff --git a/binding/core.py b/binding/core.py index c89020b..60d3330 100644 --- a/binding/core.py +++ b/binding/core.py @@ -67,3 +67,35 @@ class DroneProxy(object): os.fsync(f.fileno()) f.close() + def getStreamStatsDict(self, stream_guid_list): # FIXME: rename? + """ + Convenience wrapper method for getStreamStats which returns StreamStats + as a dictionary instead of a List + TODO: document dictionary structure + """ + class StreamStatsDict: + def __repr__(self): + s = 'port: {\n' + for k, v in self.port.items(): + s += str(k) + ': {' + str(v) + '}\n' + s += '}' + return s + class StreamStatsDictPort: + def __repr__(self): + s = 'sguid: { ' + for k, v in self.sguid.items(): + s += str(k) + ': {' + str(v).replace('\n', ' ') + '}\n' + s += '}' + return s + ssl = self.getStreamStats(stream_guid_list) + ssd = StreamStatsDict() + # TODO: ssd.aggr = dict() + ssd.port = dict() + for ss in ssl.stream_stats: + if ss.port_id.id not in ssd.port: + ssd.port[ss.port_id.id] = StreamStatsDictPort() + # TODO: ssd.port[ss.port_id.id].aggr = dict() + ssd.port[ss.port_id.id].sguid = dict() + assert ss.stream_guid.id not in ssd.port[ss.port_id.id].sguid + ssd.port[ss.port_id.id].sguid[ss.stream_guid.id] = ss + return ssd diff --git a/common/protocol.proto b/common/protocol.proto index 600ce56..da91021 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -277,8 +277,8 @@ message StreamGuidList { } message StreamStats { - required StreamGuid stream_guid = 1; - required PortId port_id = 2; // FIXME: change to optional - reuse for aggr? + required PortId port_id = 1; + required StreamGuid stream_guid = 2; optional uint64 rx_pkts = 11; optional uint64 rx_bytes = 12; diff --git a/common/sign.cpp b/common/sign.cpp index 519ed71..8ac7940 100644 --- a/common/sign.cpp +++ b/common/sign.cpp @@ -195,4 +195,20 @@ _exit: return isOk; } +bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid) +{ + const uchar *p = pkt + pktLen - sizeof(kSignMagic); + quint32 magic = qFromBigEndian(p); + if (magic != kSignMagic) + return false; + p--; + while (*p != kTypeLenEnd) { + if (*p == kTypeLenGuid) { + *guid = qFromBigEndian(p - 3) >> 8; + return true; + } + p -= 1 + (*p >> 5); // move to next TLV + } + return false; +} diff --git a/common/sign.h b/common/sign.h index 78861a4..1b8cf03 100644 --- a/common/sign.h +++ b/common/sign.h @@ -81,6 +81,7 @@ public: virtual bool setFieldData(int index, const QVariant &value, FieldAttrib attrib = FieldValue); + static bool packetGuid(const uchar *pkt, int pktLen, uint *guid); private: static const quint32 kSignMagic = 0xa1b2c3d4; // FIXME static const quint8 kTypeLenEnd = 0x00; diff --git a/server/packetsequence.h b/server/packetsequence.h index 494ba2b..402faca 100644 --- a/server/packetsequence.h +++ b/server/packetsequence.h @@ -21,6 +21,8 @@ along with this program. If not, see #define _PACKET_SEQUENCE_H #include "pcapextra.h" +#include "../common/sign.h" +#include "streamstats.h" class PacketSequence { @@ -46,6 +48,7 @@ public: } int appendPacket(const struct pcap_pkthdr *pktHeader, const uchar *pktData) { + int ret; if (lastPacket_) { usecDuration_ += (pktHeader->ts.tv_sec @@ -57,7 +60,15 @@ public: bytes_ += pktHeader->caplen; lastPacket_ = (struct pcap_pkthdr *) (sendQueue_->buffer + sendQueue_->len); - return pcap_sendqueue_queue(sendQueue_, pktHeader, pktData); + ret = pcap_sendqueue_queue(sendQueue_, pktHeader, pktData); + if (ret >= 0) { + uint guid; + if (SignProtocol::packetGuid(pktData, pktHeader->caplen, &guid)) { + streamStatsMeta_[guid].tx_pkts++; + streamStatsMeta_[guid].tx_bytes += pktHeader->caplen; + } + } + return ret; } pcap_send_queue *sendQueue_; struct pcap_pkthdr *lastPacket_; @@ -67,6 +78,7 @@ public: int repeatCount_; int repeatSize_; long usecDelay_; + StreamStats streamStatsMeta_; }; #endif diff --git a/server/pcaptxthread.cpp b/server/pcaptxthread.cpp index e58d5ef..183a7d4 100644 --- a/server/pcaptxthread.cpp +++ b/server/pcaptxthread.cpp @@ -247,6 +247,8 @@ void PcapTxThread::run() packetSequenceList_.at(i)->usecDuration_); } + lastStats_ = *stats_; // used for stream stats + state_ = kRunning; i = 0; while (i < packetSequenceList_.size()) @@ -335,10 +337,7 @@ _restart: } _exit: - // TODO: update stream stats - // FIXME: temporary data for testing - streamStats_[1001].tx_pkts = 12345; - streamStats_[1001].tx_bytes = 56789; + updateStreamStats(); state_ = kFinished; } @@ -423,7 +422,7 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, // Step to the next packet in the buffer hdr = (struct pcap_pkthdr*) (pkt + pktLen); - pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); + pkt = (uchar*) ((uchar*)hdr + sizeof(*hdr)); // FIXME: superfluous? if (stop_) { @@ -434,6 +433,68 @@ int PcapTxThread::sendQueueTransmit(pcap_t *p, return 0; } +void PcapTxThread::updateStreamStats() +{ + // Number of tx packets sent during last transmit + quint64 pkts = stats_->pkts > lastStats_.pkts ? + stats_->pkts - lastStats_.pkts : + stats_->pkts + (ULLONG_MAX - lastStats_.pkts); + + // Calculate - + // number of (complete) repeats of packetList_ + // - this is same as number of repeats of each PacketSequence + // number of pkts sent in last partial repeat of packetList_ + // - This encompasses 0 or more potentially partial PacketSequence + // FIXME: incorrect, we need to consider the expandedPacketCount_ i.e. + // the configured repeats of each sequence ('n') + int c = pkts/packetCount_; + int d = pkts%packetCount_; + foreach(PacketSequence *seq, packetSequenceList_) { + uint a = c; + if (d >= seq->packets_) { + // last repeat of this seq was complete - so just bump up the + // repeat count and reduce the full seq pkt count from the + // last repeat pkt sent count + // NOTE: if the packet sent count is an exact multiple of + // packetList_ count, then d will be 0 and we will never come + // inside this condition + Q_ASSERT(seq->packets_); + a = c + 1; + d -= seq->packets_; + } + else if (d) { // (d < seq->packets_) + // last repeat of this seq was partial, so we need to traverse + // this sequence upto 'd' pkts to update streamStats + struct pcap_pkthdr *hdr = + (struct pcap_pkthdr*) seq->sendQueue_->buffer; + char *end = seq->sendQueue_->buffer + seq->sendQueue_->len; + + while(d && ((char*) hdr < end)) { + uchar *pkt = (uchar*)hdr + sizeof(*hdr); + uint guid; + + if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) { + streamStats_[guid].tx_pkts++; + streamStats_[guid].tx_bytes += hdr->caplen; + } + + // Step to the next packet in the buffer + hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen); + d--; + } + } + + StreamStatsIterator i(seq->streamStatsMeta_); + while (i.hasNext()) { + i.next(); + uint guid = i.key(); + StreamStatsTuple ssm = i.value(); + streamStats_[guid].tx_pkts += a * ssm.tx_pkts; + streamStats_[guid].tx_bytes += a * ssm.tx_bytes; + } + } +} + void PcapTxThread::udelay(unsigned long usec) { #if defined(Q_OS_WIN32) diff --git a/server/pcaptxthread.h b/server/pcaptxthread.h index d77b1cf..22c4c50 100644 --- a/server/pcaptxthread.h +++ b/server/pcaptxthread.h @@ -22,12 +22,11 @@ along with this program. If not, see #include "abstractport.h" #include "packetsequence.h" +#include "statstuple.h" #include #include -class StatsTuple; - class PcapTxThread: public QThread { public: @@ -67,6 +66,7 @@ private: static void udelay(unsigned long usec); int sendQueueTransmit(pcap_t *p, pcap_send_queue *queue, long &overHead, int sync); + void updateStreamStats(); QList packetSequenceList_; PacketSequence *currentPacketSequence_; @@ -85,6 +85,7 @@ private: volatile State state_; StatsTuple *stats_; + StatsTuple lastStats_; StreamStats streamStats_; }; diff --git a/test/streamstatstest.py b/test/streamstatstest.py index d76cede..ba012bd 100644 --- a/test/streamstatstest.py +++ b/test/streamstatstest.py @@ -126,8 +126,8 @@ def ports(request, drone): assert len(port_config_list.port) != 0 # print port list and find default X/Y ports - port_x_num = -1 - port_y_num = -1 + ports.x_num = -1 + ports.y_num = -1 print port_config_list print('Port List') print('---------') @@ -135,32 +135,32 @@ def ports(request, drone): print('%d.%s (%s)' % (port.port_id.id, port.name, port.description)) # use a vhost port as default X/Y port if ('vhost' in port.name or 'sun' in port.description.lower()): - if port_x_num < 0: - port_x_num = port.port_id.id - elif port_y_num < 0: - port_y_num = port.port_id.id + if ports.x_num < 0: + ports.x_num = port.port_id.id + elif ports.y_num < 0: + ports.y_num = port.port_id.id if ('eth1' in port.name): - port_x_num = port.port_id.id + ports.x_num = port.port_id.id if ('eth2' in port.name): - port_y_num = port.port_id.id + ports.y_num = port.port_id.id - assert port_x_num >= 0 - assert port_y_num >= 0 + assert ports.x_num >= 0 + assert ports.y_num >= 0 - print('Using port %d as port X' % port_x_num) - print('Using port %d as port Y' % port_y_num) + print('Using port %d as port X' % ports.x_num) + print('Using port %d as port Y' % ports.y_num) ports.x = ost_pb.PortIdList() - ports.x.port_id.add().id = port_x_num; + ports.x.port_id.add().id = ports.x_num; ports.y = ost_pb.PortIdList() - ports.y.port_id.add().id = port_y_num; + ports.y.port_id.add().id = ports.y_num; # Enable stream stats on ports portConfig = ost_pb.PortConfigList() - portConfig.port.add().port_id.id = port_x_num; + portConfig.port.add().port_id.id = ports.x_num; portConfig.port[0].stream_stats_tracking = True; - portConfig.port.add().port_id.id = port_y_num; + portConfig.port.add().port_id.id = ports.y_num; portConfig.port[1].stream_stats_tracking = True; print('Enabling Stream Stats tracking on ports X and Y'); drone.modifyPort(portConfig); @@ -352,7 +352,6 @@ def stream_guids(request, drone, ports): stream_guids = ost_pb.StreamGuidList() stream_guids.port_id_list.port_id.add().id = ports.x.port_id[0].id; stream_guids.port_id_list.port_id.add().id = ports.y.port_id[0].id; - stream_guids.stream_guid.add().id = 101 return stream_guids """ @@ -413,15 +412,22 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, /.101 \.101 HostX HostY """ + npkts = 14 log.info('configuring stream %d on port X' % stream.stream[0].stream_id.id) + stream.stream[1].control.num_packets = npkts + nbytes = npkts * (stream.stream[1].core.frame_len - 4) drone.modifyStream(stream) # clear port X/Y stats log.info('clearing stats') drone.clearStats(ports.x) drone.clearStats(ports.y) - drone.clearStreamStats(ports.x) - drone.clearStreamStats(ports.y) + drone.clearStreamStats(stream_guids) + + # verify stream strats are indeed cleared + ssd = drone.getStreamStatsDict(stream_guids) + assert len(ssd.port) == 0 + # resolve ARP/NDP on ports X/Y log.info('resolving Neighbors on (X, Y) ports ...') @@ -460,16 +466,18 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap', '-Y', filter]) print(cap_pkts) - assert cap_pkts.count('\n') == 10 + assert cap_pkts.count('\n') == npkts os.remove('capture.pcap') # verify stream stats - stream_guids.ClearField("stream_guid"); - stream_stats_list = drone.getStreamStats(stream_guids) - log.info('--> (stream_stats)' + stream_stats_list.__str__()) - assert len(stream_stats_list.stream_stats) == 2 - - # FIXME: verify stream stats + ssd = drone.getStreamStatsDict(stream_guids) + log.info('--> (stream stats)\n' + str(ssd)) + # FIXME (temp): assert len(ssd.port) == 2 + assert len(ssd.port[ports.x_num].sguid) == 1 + assert ssd.port[ports.x_num].sguid[101].tx_pkts == npkts + assert ssd.port[ports.x_num].sguid[101].tx_bytes == nbytes + assert len(ssd.port[ports.y_num].sguid) == 1 + # TODO: rx = tx except RpcError as e: raise @@ -484,4 +492,7 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, # * Verify that bi-directional stream stats are correct for multiple streams # * Verify protocol combinations - Eth, IPv4/IPv6, TCP/UDP, Pattern # * Verify transmit modes +# * Verify tx pkts = multiple repeats of packetList + partial repeat +# * Verify tx pkts = multiple complete repeats of packetList + no partials +# * Verify tx pkts = first seq partial #