From defdc218bd40301c05023e6c82cf17b99ecdb0da Mon Sep 17 00:00:00 2001 From: Srivats P Date: Tue, 15 Nov 2016 22:23:55 +0530 Subject: [PATCH] sign: fix the tx stream stats calculation and update associated test case --- server/pcaptxthread.cpp | 134 ++++++++++++++++++--------- server/pcaptxthread.h | 5 +- test/streamstatstest.py | 201 +++++++++++++++++++++++++++++++++------- 3 files changed, 261 insertions(+), 79 deletions(-) diff --git a/server/pcaptxthread.cpp b/server/pcaptxthread.cpp index 183a7d4..d13532a 100644 --- a/server/pcaptxthread.cpp +++ b/server/pcaptxthread.cpp @@ -92,6 +92,7 @@ void PcapTxThread::clearPacketList() repeatSize_ = 0; packetCount_ = 0; + packetListSize_ = 0; returnToQIdx_ = -1; setPacketListLoopMode(false, 0, 0); @@ -153,6 +154,8 @@ bool PcapTxThread::appendToPacketList(long sec, long nsec, } packetCount_++; + packetListSize_ += repeatSize_ ? + currentPacketSequence_->repeatCount_ : 1; if (repeatSize_ > 0 && packetCount_ == repeatSize_) { qDebug("repeatSequenceStart_=%d, repeatSize_ = %llu", @@ -441,58 +444,99 @@ void PcapTxThread::updateStreamStats() stats_->pkts + (ULLONG_MAX - lastStats_.pkts); // Calculate - - // number of (complete) repeats of packetList_ - // - this is same as number of repeats of each PacketSequence + // number of complete repeats of packetList_ + // => each PacketSet in the packetList is repeated these many times // number of pkts sent in last partial repeat of packetList_ - // - This encompasses 0 or more potentially partial PacketSequence - // FIXME: incorrect, we need to consider the expandedPacketCount_ i.e. - // the configured repeats of each sequence ('n') - int c = pkts/packetCount_; - int d = pkts%packetCount_; - foreach(PacketSequence *seq, packetSequenceList_) { - uint a = c; - if (d >= seq->packets_) { - // last repeat of this seq was complete - so just bump up the - // repeat count and reduce the full seq pkt count from the - // last repeat pkt sent count - // NOTE: if the packet sent count is an exact multiple of - // packetList_ count, then d will be 0 and we will never come - // inside this condition - Q_ASSERT(seq->packets_); - a = c + 1; - d -= seq->packets_; - } - else if (d) { // (d < seq->packets_) - // last repeat of this seq was partial, so we need to traverse - // this sequence upto 'd' pkts to update streamStats - struct pcap_pkthdr *hdr = - (struct pcap_pkthdr*) seq->sendQueue_->buffer; - char *end = seq->sendQueue_->buffer + seq->sendQueue_->len; + // - This encompasses 0 or more potentially partial PacketSets + // XXX: Note for the above, we consider a PacketSet to include its + // own repeats within itself + int c = pkts/packetListSize_; + int d = pkts%packetListSize_; - while(d && ((char*) hdr < end)) { - uchar *pkt = (uchar*)hdr + sizeof(*hdr); - uint guid; + int i; - if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) { - streamStats_[guid].tx_pkts++; - streamStats_[guid].tx_bytes += hdr->caplen; - } + if (!c) + goto _last_repeat; - // Step to the next packet in the buffer - hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen); - d--; + i = 0; + while (i < packetSequenceList_.size()) { + PacketSequence *seq = packetSequenceList_.at(i); + int rptSz = seq->repeatSize_; + int rptCnt = seq->repeatCount_; + + for (int k = 0; k < rptSz; k++) { + seq = packetSequenceList_.at(i+k); + StreamStatsIterator iter(seq->streamStatsMeta_); + while (iter.hasNext()) { + iter.next(); + uint guid = iter.key(); + StreamStatsTuple ssm = iter.value(); + streamStats_[guid].tx_pkts += c * rptCnt * ssm.tx_pkts; + streamStats_[guid].tx_bytes += c * rptCnt * ssm.tx_bytes; } } - - StreamStatsIterator i(seq->streamStatsMeta_); - while (i.hasNext()) { - i.next(); - uint guid = i.key(); - StreamStatsTuple ssm = i.value(); - streamStats_[guid].tx_pkts += a * ssm.tx_pkts; - streamStats_[guid].tx_bytes += a * ssm.tx_bytes; - } + // Move to the next Packet Set + i += rptSz; } + +_last_repeat: + if (!d) + goto _done; + + i = 0; + while (i < packetSequenceList_.size()) { + PacketSequence *seq = packetSequenceList_.at(i); + int rptSz = seq->repeatSize_; + int rptCnt = seq->repeatCount_; + + for (int j = 0; j < rptCnt; j++) { + for (int k = 0; k < rptSz; k++) { + seq = packetSequenceList_.at(i+k); + Q_ASSERT(seq->packets_); + if (d >= seq->packets_) { + // All packets of this seq were sent + StreamStatsIterator iter(seq->streamStatsMeta_); + while (iter.hasNext()) { + iter.next(); + uint guid = iter.key(); + StreamStatsTuple ssm = iter.value(); + streamStats_[guid].tx_pkts += ssm.tx_pkts; + streamStats_[guid].tx_bytes += ssm.tx_bytes; + } + d -= seq->packets_; + } + else { // (d < seq->packets_) + // not all packets of this seq were sent, so we need to + // traverse this seq upto 'd' pkts, parse guid from the + // packet and update streamStats + struct pcap_pkthdr *hdr = + (struct pcap_pkthdr*) seq->sendQueue_->buffer; + char *end = seq->sendQueue_->buffer + seq->sendQueue_->len; + + while(d && ((char*) hdr < end)) { + uchar *pkt = (uchar*)hdr + sizeof(*hdr); + uint guid; + + if (SignProtocol::packetGuid(pkt, hdr->caplen, &guid)) { + streamStats_[guid].tx_pkts++; + streamStats_[guid].tx_bytes += hdr->caplen; + } + + // Step to the next packet in the buffer + hdr = (struct pcap_pkthdr*) (pkt + hdr->caplen); + d--; + } + Q_ASSERT(d == 0); + goto _done; + } + } + } + // Move to the next Packet Set + i += rptSz; + } + +_done: + return; } void PcapTxThread::udelay(unsigned long usec) diff --git a/server/pcaptxthread.h b/server/pcaptxthread.h index 22c4c50..29ea0fa 100644 --- a/server/pcaptxthread.h +++ b/server/pcaptxthread.h @@ -68,12 +68,15 @@ private: int sync); void updateStreamStats(); - QList packetSequenceList_; + // Intermediate state variables used while building the packet list PacketSequence *currentPacketSequence_; int repeatSequenceStart_; quint64 repeatSize_; quint64 packetCount_; + QList packetSequenceList_; + quint64 packetListSize_; // count of pkts in packet List including repeats + int returnToQIdx_; quint64 loopDelay_; diff --git a/test/streamstatstest.py b/test/streamstatstest.py index ba012bd..5f53e9e 100644 --- a/test/streamstatstest.py +++ b/test/streamstatstest.py @@ -5,6 +5,7 @@ import ipaddress import logging import os import pytest +import random import subprocess import sys import time @@ -198,6 +199,7 @@ def dgid_list(request, drone, ports): dg = devgrp_cfg.device_group.add() dg.device_group_id.id = dgid_list.x.device_group_id[0].id dg.core.name = "HostX" + dg.device_count = 5 dg.Extensions[emul.mac].address = 0x000102030a01 @@ -290,16 +292,16 @@ def stream_clear(request, drone, ports): sid_list = drone.getStreamIdList(ports.x.port_id[0]) drone.deleteStream(sid_list) -@pytest.fixture(scope='module') -def stream(request, drone, ports): +@pytest.fixture +def stream(request, drone, ports, sign_stream_cfg): # add stream(s) stream_id = ost_pb.StreamIdList() stream_id.port_id.CopyFrom(ports.x.port_id[0]) - stream_id.stream_id.add().id = 1 # Unsigned stream - stream_id.stream_id.add().id = 101 # Signed stream - log.info('adding X stream(s) %d %d' % - (stream_id.stream_id[0].id, stream_id.stream_id[1].id)) + stream_id.stream_id.add().id = 1 # Unsigned stream + stream_id.stream_id.add().id = 2 # Signed stream + stream_id.stream_id.add().id = 3 # Signed stream + log.info('adding X stream(s) ' + str(stream_id)) drone.addStream(stream_id) # configure the stream(s) @@ -308,11 +310,19 @@ def stream(request, drone, ports): s = stream_cfg.stream.add() s.stream_id.id = stream_id.stream_id[0].id s.core.is_enabled = True - s.control.packets_per_sec = 100 - s.control.num_packets = 10 + # On Win32, packets in a seq that take less than 1s to transmit + # are transmitted in one shot using pcap_send_queue; this means that + # stopTransmit() cannot stop them in the middle of the sequence and + # stop will happen only at the pktSeq boundary + # Since we want to test cases where stop is trigerred in the middle of + # a sequence, we set pps < 1000 + # FIXME: for some reason values such as 100pps is also leading to + # stop only at seq boundary - need to debug + s.control.packets_per_sec = 523 # setup (unsigned) stream protocols as mac:eth2:ip:payload p = s.protocol.add() + s.control.num_packets = 10 p.protocol_id.id = ost_pb.Protocol.kMacFieldNumber p.Extensions[mac].dst_mac_mode = Mac.e_mm_resolve p.Extensions[mac].src_mac_mode = Mac.e_mm_resolve @@ -326,11 +336,16 @@ def stream(request, drone, ports): s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber - # setup (signed) stream protocols as mac:eth2:ip:udp:payload + # setup (signed) stream protocols as mac:eth2:ip:udp|tcp:payload # Remove payload, add udp, payload and sign protocol to signed stream(s) + sscfg = sign_stream_cfg s = stream_cfg.stream.add() s.CopyFrom(stream_cfg.stream[0]) s.stream_id.id = stream_id.stream_id[1].id + s.control.num_packets = sscfg['num_pkts'][0] + if sscfg['num_var_pkts'][0]: + s.protocol[2].Extensions[ip4].src_ip_mode = Ip4.e_im_inc_host + s.protocol[2].Extensions[ip4].src_ip_count = sscfg['num_var_pkts'][0] del s.protocol[-1] s.protocol.add().protocol_id.id = ost_pb.Protocol.kUdpFieldNumber s.protocol.add().protocol_id.id = ost_pb.Protocol.kPayloadFieldNumber @@ -338,9 +353,25 @@ def stream(request, drone, ports): p.protocol_id.id = ost_pb.Protocol.kSignFieldNumber p.Extensions[sign].stream_guid = 101 + s = stream_cfg.stream.add() + s.CopyFrom(stream_cfg.stream[1]) + s.stream_id.id = stream_id.stream_id[2].id + s.core.frame_len = 128 + s.control.num_packets = sscfg['num_pkts'][1] + if sscfg['num_var_pkts'][1]: + s.protocol[2].Extensions[ip4].src_ip_mode = Ip4.e_im_inc_host + s.protocol[2].Extensions[ip4].src_ip_count = sscfg['num_var_pkts'][1] + s.protocol[-3].protocol_id.id = ost_pb.Protocol.kTcpFieldNumber + s.protocol[-1].Extensions[sign].stream_guid = 102 + + if sscfg['loop']: + s.control.next = ost_pb.StreamControl.e_nw_goto_id + + drone.modifyStream(stream_cfg) + def fin(): # delete streams - log.info('deleting tx_stream %d' % stream_id.stream_id[0].id) + log.info('deleting tx_stream ' + str(stream_id)) drone.deleteStream(stream_id) request.addfinalizer(fin) @@ -399,8 +430,13 @@ def stream_toggle_payload(request, drone, ports): # ----------------------------------------------------------------- # # ================================================================= # +@pytest.mark.parametrize('sign_stream_cfg', [ + {'num_pkts': [270, 512], 'num_var_pkts': [0, 0], 'loop': False}, + {'num_pkts': [276, 510], 'num_var_pkts': [0, 0], 'loop': True}, + #{'num_pkts': [19, 30], 'num_var_pkts': [0, 0], 'loop': True}, +]) def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, - stream_clear, stream, stream_guids): + sign_stream_cfg, stream_clear, stream, stream_guids): """ TESTCASE: Verify that uni-directional stream stats are correct for a single signed stream X --> Y DUT @@ -412,12 +448,10 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, /.101 \.101 HostX HostY """ - npkts = 14 - log.info('configuring stream %d on port X' % stream.stream[0].stream_id.id) - stream.stream[1].control.num_packets = npkts - nbytes = npkts * (stream.stream[1].core.frame_len - 4) - drone.modifyStream(stream) + # calculate tx pkts + total_tx_pkts = 10 + sign_stream_cfg['num_pkts'][0] \ + + sign_stream_cfg['num_pkts'][1] # clear port X/Y stats log.info('clearing stats') drone.clearStats(ports.x) @@ -428,7 +462,6 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, ssd = drone.getStreamStatsDict(stream_guids) assert len(ssd.port) == 0 - # resolve ARP/NDP on ports X/Y log.info('resolving Neighbors on (X, Y) ports ...') drone.resolveDeviceNeighbors(emul_ports) @@ -438,46 +471,148 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, try: drone.startCapture(ports.y) + drone.startCapture(ports.x) + time.sleep(1) drone.startTransmit(ports.x) log.info('waiting for transmit to finish ...') - time.sleep(3) + time.sleep(random.randint(3, 10)) drone.stopTransmit(ports.x) + time.sleep(1) + drone.stopCapture(ports.x) drone.stopCapture(ports.y) # verify port stats x_stats = drone.getStats(ports.x) log.info('--> (x_stats)' + x_stats.__str__()) - assert(x_stats.port_stats[0].tx_pkts >= 20) + if not sign_stream_cfg['loop']: + assert(x_stats.port_stats[0].tx_pkts >= total_tx_pkts) y_stats = drone.getStats(ports.y) log.info('--> (y_stats)' + y_stats.__str__()) - assert(y_stats.port_stats[0].rx_pkts >= 20) + if not sign_stream_cfg['loop']: + assert(y_stats.port_stats[0].rx_pkts >= total_tx_pkts) + + ssd = drone.getStreamStatsDict(stream_guids) + # FIXME (temp): assert len(ssd.port) == 2 + + # dump X capture buffer + log.info('getting X capture buffer') + buff = drone.getCaptureBuffer(ports.x.port_id[0]) + drone.saveCaptureBuffer(buff, 'capX.pcap') + log.info('dumping X capture buffer') + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap']) + print(cap_pkts) + + # get sign stream #1 Tx stats from capture + filter="frame[-9:9]==00.00.00.65.61.a1.b2.c3.d4" + print(filter) + log.info('dumping X capture buffer (filtered)') + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap', + '-Y', filter]) + print(cap_pkts) + sign_stream1_cnt = cap_pkts.count('\n') + + # get sign stream #2 Tx stats from capture + filter="frame[-9:9]==00.00.00.66.61.a1.b2.c3.d4" + print(filter) + log.info('dumping Y capture buffer (filtered)') + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capX.pcap', + '-Y', filter]) + print(cap_pkts) + sign_stream2_cnt = cap_pkts.count('\n') + + os.remove('capX.pcap') + + log.info('sign stream 101 tx cap count: %d' % (sign_stream1_cnt)) + log.info('sign stream 102 tx cap count: %d' % (sign_stream2_cnt)) + log.info('--> (stream stats)\n' + str(ssd)) + + # verify tx stream stats from drone is same as that from capture + assert len(ssd.port[ports.x_num].sguid) == 2 + if sign_stream_cfg['loop']: + assert ssd.port[ports.x_num].sguid[101].tx_pkts \ + == sign_stream1_cnt + assert ssd.port[ports.x_num].sguid[101].tx_bytes \ + == (sign_stream1_cnt + * (stream.stream[1].core.frame_len - 4)) + assert ssd.port[ports.x_num].sguid[102].tx_pkts \ + == sign_stream2_cnt + assert ssd.port[ports.x_num].sguid[102].tx_bytes \ + == (sign_stream2_cnt + * (stream.stream[2].core.frame_len - 4)) + else: + assert ssd.port[ports.x_num].sguid[101].tx_pkts \ + == sign_stream_cfg['num_pkts'][0] + assert ssd.port[ports.x_num].sguid[101].tx_bytes \ + == (sign_stream_cfg['num_pkts'][0] + * (stream.stream[1].core.frame_len - 4)) + assert ssd.port[ports.x_num].sguid[102].tx_pkts \ + == sign_stream_cfg['num_pkts'][1] + assert ssd.port[ports.x_num].sguid[102].tx_bytes \ + == (sign_stream_cfg['num_pkts'][1] + * (stream.stream[2].core.frame_len - 4)) # dump Y capture buffer log.info('getting Y capture buffer') buff = drone.getCaptureBuffer(ports.y.port_id[0]) - drone.saveCaptureBuffer(buff, 'capture.pcap') + drone.saveCaptureBuffer(buff, 'capY.pcap') log.info('dumping Y capture buffer') - cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap']) + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap']) print(cap_pkts) + + # get sign stream #1 Rx stats from capture filter="frame[-9:9]==00.00.00.65.61.a1.b2.c3.d4" print(filter) log.info('dumping Y capture buffer (filtered)') - cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capture.pcap', + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap', '-Y', filter]) print(cap_pkts) - assert cap_pkts.count('\n') == npkts - os.remove('capture.pcap') + sign_stream1_cnt = cap_pkts.count('\n') - # verify stream stats - ssd = drone.getStreamStatsDict(stream_guids) + # get sign stream #2 Rx stats from capture + filter="frame[-9:9]==00.00.00.66.61.a1.b2.c3.d4" + print(filter) + log.info('dumping Y capture buffer (filtered)') + cap_pkts = subprocess.check_output([tshark, '-n', '-r', 'capY.pcap', + '-Y', filter]) + print(cap_pkts) + sign_stream2_cnt = cap_pkts.count('\n') + + os.remove('capY.pcap') + + log.info('--> (x_stats)' + x_stats.__str__()) + log.info('--> (y_stats)' + y_stats.__str__()) + log.info('sign stream 101 rx cap count: %d' % (sign_stream1_cnt)) + log.info('sign stream 102 rx cap count: %d' % (sign_stream2_cnt)) log.info('--> (stream stats)\n' + str(ssd)) - # FIXME (temp): assert len(ssd.port) == 2 - assert len(ssd.port[ports.x_num].sguid) == 1 - assert ssd.port[ports.x_num].sguid[101].tx_pkts == npkts - assert ssd.port[ports.x_num].sguid[101].tx_bytes == nbytes - assert len(ssd.port[ports.y_num].sguid) == 1 - # TODO: rx = tx + + """ + # verify rx stream stats from drone is same as that from capture + assert len(ssd.port[ports.y_num].sguid) == 2 + if sign_stream_cfg['loop']: + assert ssd.port[ports.y_num].sguid[101].tx_pkts \ + == sign_stream1_cnt + assert ssd.port[ports.y_num].sguid[101].tx_bytes \ + == (sign_stream1_cnt + * (stream.stream[1].core.frame_len - 4)) + assert ssd.port[ports.y_num].sguid[102].tx_pkts \ + == sign_stream2_cnt + assert ssd.port[ports.y_num].sguid[102].tx_bytes \ + == (sign_stream2_cnt + * (stream.stream[2].core.frame_len - 4)) + else: + assert ssd.port[ports.y_num].sguid[101].tx_pkts \ + == sign_stream_cfg['num_pkts'][0] + assert ssd.port[ports.y_num].sguid[101].tx_bytes \ + == (sign_stream_cfg['num_pkts'][0] + * (stream.stream[1].core.frame_len - 4)) + assert ssd.port[ports.y_num].sguid[102].tx_pkts \ + == sign_stream_cfg['num_pkts'][1] + assert ssd.port[ports.y_num].sguid[102].tx_bytes \ + == (sign_stream_cfg['num_pkts'][1] + * (stream.stream[2].core.frame_len - 4)) + """ + # TODO?: rx = tx except RpcError as e: raise