From e9bdfa04ea0b14cf06b1c50e8a6019bf68d859d0 Mon Sep 17 00:00:00 2001 From: Srivats P Date: Thu, 17 Nov 2016 21:44:34 +0530 Subject: [PATCH] sign: implemented rx stream stats - loopback problem to be fixed --- common/protocol.proto | 2 +- common/sign.cpp | 5 ++ common/sign.h | 1 + server/abstractport.cpp | 10 +++ server/abstractport.h | 2 + server/drone.pro | 1 + server/pcapport.cpp | 14 ++++ server/pcapport.h | 5 ++ server/pcaprxstats.cpp | 174 ++++++++++++++++++++++++++++++++++++++++ server/pcaprxstats.h | 52 ++++++++++++ test/streamstatstest.py | 41 +++++++--- 11 files changed, 294 insertions(+), 13 deletions(-) create mode 100644 server/pcaprxstats.cpp create mode 100644 server/pcaprxstats.h diff --git a/common/protocol.proto b/common/protocol.proto index da91021..98a391f 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -205,7 +205,7 @@ message Port { optional bool is_exclusive_control = 6; optional TransmitMode transmit_mode = 7 [default = kSequentialTransmit]; optional string user_name = 8; - optional bool stream_stats_tracking = 9; // FIXME: rename to is_XXX ? + optional bool track_stream_stats = 9; // FIXME: rename to is_XXX ? } message PortConfigList { diff --git a/common/sign.cpp b/common/sign.cpp index 8ac7940..805f5bd 100644 --- a/common/sign.cpp +++ b/common/sign.cpp @@ -195,6 +195,11 @@ _exit: return isOk; } +quint32 SignProtocol::magic() +{ + return kSignMagic; +} + bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid) { const uchar *p = pkt + pktLen - sizeof(kSignMagic); diff --git a/common/sign.h b/common/sign.h index 1b8cf03..98963c5 100644 --- a/common/sign.h +++ b/common/sign.h @@ -81,6 +81,7 @@ public: virtual bool setFieldData(int index, const QVariant &value, FieldAttrib attrib = FieldValue); + static quint32 magic(); static bool packetGuid(const uchar *pkt, int pktLen, uint *guid); private: static const quint32 kSignMagic = 0xa1b2c3d4; // FIXME diff --git a/server/abstractport.cpp b/server/abstractport.cpp index 25c74ba..977908a 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -102,6 +102,16 @@ bool AbstractPort::modify(const OstProto::Port &port) data_.set_user_name(port.user_name()); } + if (port.has_track_stream_stats()) { + bool val = port.track_stream_stats() ? + startStreamStatsTracking() : stopStreamStatsTracking(); + + if (val) + data_.set_track_stream_stats(port.track_stream_stats()); + + ret |= val; + } + return ret; } diff --git a/server/abstractport.h b/server/abstractport.h index afe156c..0324948 100644 --- a/server/abstractport.h +++ b/server/abstractport.h @@ -114,6 +114,8 @@ public: void stats(PortStats *stats); void resetStats() { epochStats_ = stats_; } + virtual bool startStreamStatsTracking() = 0; + virtual bool stopStreamStatsTracking() = 0; // FIXME: combine single and All calls? void streamStats(uint guid, OstProto::StreamStatsList *stats); void streamStatsAll(OstProto::StreamStatsList *stats); diff --git a/server/drone.pro b/server/drone.pro index 68d3a71..376790a 100644 --- a/server/drone.pro +++ b/server/drone.pro @@ -42,6 +42,7 @@ SOURCES += \ abstractport.cpp \ pcapport.cpp \ pcaptransmitter.cpp \ + pcaprxstats.cpp \ pcaptxstats.cpp \ pcaptxthread.cpp \ bsdport.cpp \ diff --git a/server/pcapport.cpp b/server/pcapport.cpp index 8f51b14..fc6aa13 100644 --- a/server/pcapport.cpp +++ b/server/pcapport.cpp @@ -34,6 +34,7 @@ PcapPort::PcapPort(int id, const char *device) transmitter_ = new PcapTransmitter(device, streamStats_); capturer_ = new PortCapturer(device); emulXcvr_ = new EmulationTransceiver(device, deviceManager_); + rxStatsPoller_ = new PcapRxStats(device, streamStats_); if (!monitorRx_->handle() || !monitorTx_->handle()) isUsable_ = false; @@ -82,6 +83,9 @@ PcapPort::~PcapPort() if (monitorTx_) monitorTx_->stop(); + rxStatsPoller_->stop(); + delete rxStatsPoller_; + delete emulXcvr_; delete capturer_; delete transmitter_; @@ -126,6 +130,16 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy) return false; } +bool PcapPort::startStreamStatsTracking() +{ + return rxStatsPoller_->start(); +} + +bool PcapPort::stopStreamStatsTracking() +{ + return rxStatsPoller_->stop(); +} + void PcapPort::startDeviceEmulation() { emulXcvr_->start(); diff --git a/server/pcapport.h b/server/pcapport.h index 04e8269..dcf662f 100644 --- a/server/pcapport.h +++ b/server/pcapport.h @@ -26,6 +26,7 @@ along with this program. If not, see #include "abstractport.h" #include "pcapextra.h" +#include "pcaprxstats.h" #include "pcaptransmitter.h" class PcapPort : public AbstractPort @@ -71,6 +72,9 @@ public: virtual bool isCaptureOn() { return capturer_->isRunning(); } virtual QIODevice* captureData() { return capturer_->captureFile(); } + virtual bool startStreamStatsTracking(); + virtual bool stopStreamStatsTracking(); + virtual void startDeviceEmulation(); virtual void stopDeviceEmulation(); virtual int sendEmulationPacket(PacketBuffer *pktBuf); @@ -166,6 +170,7 @@ private: PcapTransmitter *transmitter_; PortCapturer *capturer_; EmulationTransceiver *emulXcvr_; + PcapRxStats *rxStatsPoller_; static pcap_if_t *deviceList_; }; diff --git a/server/pcaprxstats.cpp b/server/pcaprxstats.cpp new file mode 100644 index 0000000..32cf9c2 --- /dev/null +++ b/server/pcaprxstats.cpp @@ -0,0 +1,174 @@ +/* +Copyright (C) 2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#include "pcaprxstats.h" +#include "../common/sign.h" + +#define notify qWarning // FIXME + +PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats) + : streamStats_(portStreamStats) +{ + device_ = QString::fromAscii(device); + stop_ = false; + state_ = kNotStarted; + + handle_ = NULL; +} + +pcap_t* PcapRxStats::handle() +{ + return handle_; +} + +void PcapRxStats::run() +{ + int flags = PCAP_OPENFLAG_PROMISCUOUS; + char errbuf[PCAP_ERRBUF_SIZE] = ""; + struct bpf_program bpf; + QString capture_filter = QString("ether[len - 4:4] == 0x%1").arg( + SignProtocol::magic(), 0, BASE_HEX); + const int optimize = 1; + + qDebug("In %s", __PRETTY_FUNCTION__); + +#ifdef Q_OS_WIN32 + flags |= PCAP_OPENFLAG_NOCAPTURE_LOCAL; // FIXME: needed? +#endif + +#ifdef Q_OS_WIN32 +_retry: + // NOCAPTURE_LOCAL needs windows only pcap_open() + handle_ = pcap_open(qPrintable(device_), 65535, + flags, 100 /* ms */, NULL, errbuf); +#else + handle_ = pcap_open_live(qPrintable(device_), 65535, + flags, 100 /* ms */, errbuf); +#endif + + if (handle_ == NULL) { + if (flags && QString(errbuf).contains("promiscuous")) { + notify("Unable to set promiscuous mode on <%s> - " + "stream stats rx will not work", qPrintable(device_)); + goto _exit; + } +#ifdef Q_OS_WIN32 + else if ((flags & PCAP_OPENFLAG_NOCAPTURE_LOCAL) + && QString(errbuf).contains("loopback")) { + qDebug("Can't set no local capture mode %s", qPrintable(device_)); + flags &= ~PCAP_OPENFLAG_NOCAPTURE_LOCAL; + goto _retry; + } +#endif + else { + notify("Unable to open <%s> [%s] - stream stats rx will not work", + qPrintable(device_), errbuf); + goto _exit; + } + } + + if (pcap_compile(handle_, &bpf, qPrintable(capture_filter), + optimize, 0) < 0) { + qWarning("%s: error compiling filter: %s", qPrintable(device_), + pcap_geterr(handle_)); + goto _skip_filter; + } + + if (pcap_setfilter(handle_, &bpf) < 0) { + qWarning("%s: error setting filter: %s", qPrintable(device_), + pcap_geterr(handle_)); + goto _skip_filter; + } + +_skip_filter: + state_ = kRunning; + while (1) { + int ret; + struct pcap_pkthdr *hdr; + const uchar *data; + + ret = pcap_next_ex(handle_, &hdr, &data); + switch (ret) { + case 1: { + uint guid; + if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) { + streamStats_[guid].rx_pkts++; + streamStats_[guid].rx_bytes += hdr->caplen; + } + break; + } + case 0: + // timeout: just go back to the loop + break; + case -1: + qWarning("%s: error reading packet (%d): %s", + __PRETTY_FUNCTION__, ret, pcap_geterr(handle_)); + break; + case -2: + default: + qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__, + ret); + } + + if (stop_) { + qDebug("user requested receiver stop\n"); + break; + } + } + pcap_close(handle_); + handle_ = NULL; + stop_ = false; + +_exit: + state_ = kFinished; +} + +bool PcapRxStats::start() +{ + if (state_ == kRunning) { + qWarning("RxStats start requested but is already running!"); + goto _exit; + } + + state_ = kNotStarted; + QThread::start(); + + while (state_ == kNotStarted) + QThread::msleep(10); +_exit: + return true; +} + +bool PcapRxStats::stop() +{ + if (state_ == kRunning) { + stop_ = true; + while (state_ == kRunning) + QThread::msleep(10); + } + else + qWarning("RxStats stop requested but is not running!"); + + return true; +} + +bool PcapRxStats::isRunning() +{ + return (state_ == kRunning); +} diff --git a/server/pcaprxstats.h b/server/pcaprxstats.h new file mode 100644 index 0000000..f13e516 --- /dev/null +++ b/server/pcaprxstats.h @@ -0,0 +1,52 @@ +/* +Copyright (C) 2016 Srivats P. + +This file is part of "Ostinato" + +This is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see +*/ + +#ifndef _PCAP_RX_STATS_H +#define _PCAP_RX_STATS_H + +#include "streamstats.h" + +#include +#include + +class PcapRxStats: public QThread +{ +public: + PcapRxStats(const char *device, StreamStats &portStreamStats); + pcap_t* handle(); + void run(); + bool start(); + bool stop(); + bool isRunning(); + +private: + enum State { + kNotStarted, + kRunning, + kFinished + }; + + QString device_; + StreamStats &streamStats_; + volatile bool stop_; + pcap_t *handle_; + volatile State state_; +}; + +#endif diff --git a/test/streamstatstest.py b/test/streamstatstest.py index 5f53e9e..0ce9332 100644 --- a/test/streamstatstest.py +++ b/test/streamstatstest.py @@ -160,9 +160,9 @@ def ports(request, drone): # Enable stream stats on ports portConfig = ost_pb.PortConfigList() portConfig.port.add().port_id.id = ports.x_num; - portConfig.port[0].stream_stats_tracking = True; + portConfig.port[0].track_stream_stats = True; portConfig.port.add().port_id.id = ports.y_num; - portConfig.port[1].stream_stats_tracking = True; + portConfig.port[1].track_stream_stats = True; print('Enabling Stream Stats tracking on ports X and Y'); drone.modifyPort(portConfig); @@ -586,33 +586,50 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, log.info('sign stream 102 rx cap count: %d' % (sign_stream2_cnt)) log.info('--> (stream stats)\n' + str(ssd)) - """ # verify rx stream stats from drone is same as that from capture assert len(ssd.port[ports.y_num].sguid) == 2 if sign_stream_cfg['loop']: - assert ssd.port[ports.y_num].sguid[101].tx_pkts \ + assert ssd.port[ports.y_num].sguid[101].rx_pkts \ == sign_stream1_cnt - assert ssd.port[ports.y_num].sguid[101].tx_bytes \ + assert ssd.port[ports.y_num].sguid[101].rx_bytes \ == (sign_stream1_cnt * (stream.stream[1].core.frame_len - 4)) - assert ssd.port[ports.y_num].sguid[102].tx_pkts \ + assert ssd.port[ports.y_num].sguid[102].rx_pkts \ == sign_stream2_cnt - assert ssd.port[ports.y_num].sguid[102].tx_bytes \ + assert ssd.port[ports.y_num].sguid[102].rx_bytes \ == (sign_stream2_cnt * (stream.stream[2].core.frame_len - 4)) else: - assert ssd.port[ports.y_num].sguid[101].tx_pkts \ + assert ssd.port[ports.y_num].sguid[101].rx_pkts \ == sign_stream_cfg['num_pkts'][0] - assert ssd.port[ports.y_num].sguid[101].tx_bytes \ + assert ssd.port[ports.y_num].sguid[101].rx_bytes \ == (sign_stream_cfg['num_pkts'][0] * (stream.stream[1].core.frame_len - 4)) - assert ssd.port[ports.y_num].sguid[102].tx_pkts \ + assert ssd.port[ports.y_num].sguid[102].rx_pkts \ == sign_stream_cfg['num_pkts'][1] - assert ssd.port[ports.y_num].sguid[102].tx_bytes \ + assert ssd.port[ports.y_num].sguid[102].rx_bytes \ == (sign_stream_cfg['num_pkts'][1] * (stream.stream[2].core.frame_len - 4)) + + # verify tx == rx + assert ssd.port[ports.x_num].sguid[101].tx_pkts \ + == ssd.port[ports.y_num].sguid[101].rx_pkts + assert ssd.port[ports.x_num].sguid[101].tx_bytes \ + == ssd.port[ports.y_num].sguid[101].rx_bytes + assert ssd.port[ports.x_num].sguid[102].tx_pkts \ + == ssd.port[ports.y_num].sguid[102].rx_pkts + assert ssd.port[ports.x_num].sguid[102].tx_bytes \ + == ssd.port[ports.y_num].sguid[102].rx_bytes + + # for unidir verify rx on tx port is 0 and vice versa + # FIXME: failing currently because tx pkts on tx port seem to be + # captured by rxStatsPoller_ on tx port + """ + assert ssd.port[ports.x_num].sguid[101].rx_pkts == 0 + assert ssd.port[ports.x_num].sguid[101].rx_bytes == 0 + assert ssd.port[ports.y_num].sguid[101].tx_pkts == 0 + assert ssd.port[ports.y_num].sguid[101].tx_bytes == 0 """ - # TODO?: rx = tx except RpcError as e: raise