sign: implemented rx stream stats - loopback problem to be fixed

This commit is contained in:
Srivats P 2016-11-17 21:44:34 +05:30
parent defdc218bd
commit e9bdfa04ea
11 changed files with 294 additions and 13 deletions

View File

@ -205,7 +205,7 @@ message Port {
optional bool is_exclusive_control = 6;
optional TransmitMode transmit_mode = 7 [default = kSequentialTransmit];
optional string user_name = 8;
optional bool stream_stats_tracking = 9; // FIXME: rename to is_XXX ?
optional bool track_stream_stats = 9; // FIXME: rename to is_XXX ?
}
message PortConfigList {

View File

@ -195,6 +195,11 @@ _exit:
return isOk;
}
quint32 SignProtocol::magic()
{
return kSignMagic;
}
bool SignProtocol::packetGuid(const uchar *pkt, int pktLen, uint *guid)
{
const uchar *p = pkt + pktLen - sizeof(kSignMagic);

View File

@ -81,6 +81,7 @@ public:
virtual bool setFieldData(int index, const QVariant &value,
FieldAttrib attrib = FieldValue);
static quint32 magic();
static bool packetGuid(const uchar *pkt, int pktLen, uint *guid);
private:
static const quint32 kSignMagic = 0xa1b2c3d4; // FIXME

View File

@ -102,6 +102,16 @@ bool AbstractPort::modify(const OstProto::Port &port)
data_.set_user_name(port.user_name());
}
if (port.has_track_stream_stats()) {
bool val = port.track_stream_stats() ?
startStreamStatsTracking() : stopStreamStatsTracking();
if (val)
data_.set_track_stream_stats(port.track_stream_stats());
ret |= val;
}
return ret;
}

View File

@ -114,6 +114,8 @@ public:
void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; }
virtual bool startStreamStatsTracking() = 0;
virtual bool stopStreamStatsTracking() = 0;
// FIXME: combine single and All calls?
void streamStats(uint guid, OstProto::StreamStatsList *stats);
void streamStatsAll(OstProto::StreamStatsList *stats);

View File

@ -42,6 +42,7 @@ SOURCES += \
abstractport.cpp \
pcapport.cpp \
pcaptransmitter.cpp \
pcaprxstats.cpp \
pcaptxstats.cpp \
pcaptxthread.cpp \
bsdport.cpp \

View File

@ -34,6 +34,7 @@ PcapPort::PcapPort(int id, const char *device)
transmitter_ = new PcapTransmitter(device, streamStats_);
capturer_ = new PortCapturer(device);
emulXcvr_ = new EmulationTransceiver(device, deviceManager_);
rxStatsPoller_ = new PcapRxStats(device, streamStats_);
if (!monitorRx_->handle() || !monitorTx_->handle())
isUsable_ = false;
@ -82,6 +83,9 @@ PcapPort::~PcapPort()
if (monitorTx_)
monitorTx_->stop();
rxStatsPoller_->stop();
delete rxStatsPoller_;
delete emulXcvr_;
delete capturer_;
delete transmitter_;
@ -126,6 +130,16 @@ bool PcapPort::setRateAccuracy(AbstractPort::Accuracy accuracy)
return false;
}
bool PcapPort::startStreamStatsTracking()
{
return rxStatsPoller_->start();
}
bool PcapPort::stopStreamStatsTracking()
{
return rxStatsPoller_->stop();
}
void PcapPort::startDeviceEmulation()
{
emulXcvr_->start();

View File

@ -26,6 +26,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#include "abstractport.h"
#include "pcapextra.h"
#include "pcaprxstats.h"
#include "pcaptransmitter.h"
class PcapPort : public AbstractPort
@ -71,6 +72,9 @@ public:
virtual bool isCaptureOn() { return capturer_->isRunning(); }
virtual QIODevice* captureData() { return capturer_->captureFile(); }
virtual bool startStreamStatsTracking();
virtual bool stopStreamStatsTracking();
virtual void startDeviceEmulation();
virtual void stopDeviceEmulation();
virtual int sendEmulationPacket(PacketBuffer *pktBuf);
@ -166,6 +170,7 @@ private:
PcapTransmitter *transmitter_;
PortCapturer *capturer_;
EmulationTransceiver *emulXcvr_;
PcapRxStats *rxStatsPoller_;
static pcap_if_t *deviceList_;
};

174
server/pcaprxstats.cpp Normal file
View File

@ -0,0 +1,174 @@
/*
Copyright (C) 2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#include "pcaprxstats.h"
#include "../common/sign.h"
#define notify qWarning // FIXME
PcapRxStats::PcapRxStats(const char *device, StreamStats &portStreamStats)
: streamStats_(portStreamStats)
{
device_ = QString::fromAscii(device);
stop_ = false;
state_ = kNotStarted;
handle_ = NULL;
}
pcap_t* PcapRxStats::handle()
{
return handle_;
}
void PcapRxStats::run()
{
int flags = PCAP_OPENFLAG_PROMISCUOUS;
char errbuf[PCAP_ERRBUF_SIZE] = "";
struct bpf_program bpf;
QString capture_filter = QString("ether[len - 4:4] == 0x%1").arg(
SignProtocol::magic(), 0, BASE_HEX);
const int optimize = 1;
qDebug("In %s", __PRETTY_FUNCTION__);
#ifdef Q_OS_WIN32
flags |= PCAP_OPENFLAG_NOCAPTURE_LOCAL; // FIXME: needed?
#endif
#ifdef Q_OS_WIN32
_retry:
// NOCAPTURE_LOCAL needs windows only pcap_open()
handle_ = pcap_open(qPrintable(device_), 65535,
flags, 100 /* ms */, NULL, errbuf);
#else
handle_ = pcap_open_live(qPrintable(device_), 65535,
flags, 100 /* ms */, errbuf);
#endif
if (handle_ == NULL) {
if (flags && QString(errbuf).contains("promiscuous")) {
notify("Unable to set promiscuous mode on <%s> - "
"stream stats rx will not work", qPrintable(device_));
goto _exit;
}
#ifdef Q_OS_WIN32
else if ((flags & PCAP_OPENFLAG_NOCAPTURE_LOCAL)
&& QString(errbuf).contains("loopback")) {
qDebug("Can't set no local capture mode %s", qPrintable(device_));
flags &= ~PCAP_OPENFLAG_NOCAPTURE_LOCAL;
goto _retry;
}
#endif
else {
notify("Unable to open <%s> [%s] - stream stats rx will not work",
qPrintable(device_), errbuf);
goto _exit;
}
}
if (pcap_compile(handle_, &bpf, qPrintable(capture_filter),
optimize, 0) < 0) {
qWarning("%s: error compiling filter: %s", qPrintable(device_),
pcap_geterr(handle_));
goto _skip_filter;
}
if (pcap_setfilter(handle_, &bpf) < 0) {
qWarning("%s: error setting filter: %s", qPrintable(device_),
pcap_geterr(handle_));
goto _skip_filter;
}
_skip_filter:
state_ = kRunning;
while (1) {
int ret;
struct pcap_pkthdr *hdr;
const uchar *data;
ret = pcap_next_ex(handle_, &hdr, &data);
switch (ret) {
case 1: {
uint guid;
if (SignProtocol::packetGuid(data, hdr->caplen, &guid)) {
streamStats_[guid].rx_pkts++;
streamStats_[guid].rx_bytes += hdr->caplen;
}
break;
}
case 0:
// timeout: just go back to the loop
break;
case -1:
qWarning("%s: error reading packet (%d): %s",
__PRETTY_FUNCTION__, ret, pcap_geterr(handle_));
break;
case -2:
default:
qFatal("%s: Unexpected return value %d", __PRETTY_FUNCTION__,
ret);
}
if (stop_) {
qDebug("user requested receiver stop\n");
break;
}
}
pcap_close(handle_);
handle_ = NULL;
stop_ = false;
_exit:
state_ = kFinished;
}
bool PcapRxStats::start()
{
if (state_ == kRunning) {
qWarning("RxStats start requested but is already running!");
goto _exit;
}
state_ = kNotStarted;
QThread::start();
while (state_ == kNotStarted)
QThread::msleep(10);
_exit:
return true;
}
bool PcapRxStats::stop()
{
if (state_ == kRunning) {
stop_ = true;
while (state_ == kRunning)
QThread::msleep(10);
}
else
qWarning("RxStats stop requested but is not running!");
return true;
}
bool PcapRxStats::isRunning()
{
return (state_ == kRunning);
}

52
server/pcaprxstats.h Normal file
View File

@ -0,0 +1,52 @@
/*
Copyright (C) 2016 Srivats P.
This file is part of "Ostinato"
This is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>
*/
#ifndef _PCAP_RX_STATS_H
#define _PCAP_RX_STATS_H
#include "streamstats.h"
#include <QThread>
#include <pcap.h>
class PcapRxStats: public QThread
{
public:
PcapRxStats(const char *device, StreamStats &portStreamStats);
pcap_t* handle();
void run();
bool start();
bool stop();
bool isRunning();
private:
enum State {
kNotStarted,
kRunning,
kFinished
};
QString device_;
StreamStats &streamStats_;
volatile bool stop_;
pcap_t *handle_;
volatile State state_;
};
#endif

View File

@ -160,9 +160,9 @@ def ports(request, drone):
# Enable stream stats on ports
portConfig = ost_pb.PortConfigList()
portConfig.port.add().port_id.id = ports.x_num;
portConfig.port[0].stream_stats_tracking = True;
portConfig.port[0].track_stream_stats = True;
portConfig.port.add().port_id.id = ports.y_num;
portConfig.port[1].stream_stats_tracking = True;
portConfig.port[1].track_stream_stats = True;
print('Enabling Stream Stats tracking on ports X and Y');
drone.modifyPort(portConfig);
@ -586,33 +586,50 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
log.info('sign stream 102 rx cap count: %d' % (sign_stream2_cnt))
log.info('--> (stream stats)\n' + str(ssd))
"""
# verify rx stream stats from drone is same as that from capture
assert len(ssd.port[ports.y_num].sguid) == 2
if sign_stream_cfg['loop']:
assert ssd.port[ports.y_num].sguid[101].tx_pkts \
assert ssd.port[ports.y_num].sguid[101].rx_pkts \
== sign_stream1_cnt
assert ssd.port[ports.y_num].sguid[101].tx_bytes \
assert ssd.port[ports.y_num].sguid[101].rx_bytes \
== (sign_stream1_cnt
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.y_num].sguid[102].tx_pkts \
assert ssd.port[ports.y_num].sguid[102].rx_pkts \
== sign_stream2_cnt
assert ssd.port[ports.y_num].sguid[102].tx_bytes \
assert ssd.port[ports.y_num].sguid[102].rx_bytes \
== (sign_stream2_cnt
* (stream.stream[2].core.frame_len - 4))
else:
assert ssd.port[ports.y_num].sguid[101].tx_pkts \
assert ssd.port[ports.y_num].sguid[101].rx_pkts \
== sign_stream_cfg['num_pkts'][0]
assert ssd.port[ports.y_num].sguid[101].tx_bytes \
assert ssd.port[ports.y_num].sguid[101].rx_bytes \
== (sign_stream_cfg['num_pkts'][0]
* (stream.stream[1].core.frame_len - 4))
assert ssd.port[ports.y_num].sguid[102].tx_pkts \
assert ssd.port[ports.y_num].sguid[102].rx_pkts \
== sign_stream_cfg['num_pkts'][1]
assert ssd.port[ports.y_num].sguid[102].tx_bytes \
assert ssd.port[ports.y_num].sguid[102].rx_bytes \
== (sign_stream_cfg['num_pkts'][1]
* (stream.stream[2].core.frame_len - 4))
# verify tx == rx
assert ssd.port[ports.x_num].sguid[101].tx_pkts \
== ssd.port[ports.y_num].sguid[101].rx_pkts
assert ssd.port[ports.x_num].sguid[101].tx_bytes \
== ssd.port[ports.y_num].sguid[101].rx_bytes
assert ssd.port[ports.x_num].sguid[102].tx_pkts \
== ssd.port[ports.y_num].sguid[102].rx_pkts
assert ssd.port[ports.x_num].sguid[102].tx_bytes \
== ssd.port[ports.y_num].sguid[102].rx_bytes
# for unidir verify rx on tx port is 0 and vice versa
# FIXME: failing currently because tx pkts on tx port seem to be
# captured by rxStatsPoller_ on tx port
"""
assert ssd.port[ports.x_num].sguid[101].rx_pkts == 0
assert ssd.port[ports.x_num].sguid[101].rx_bytes == 0
assert ssd.port[ports.y_num].sguid[101].tx_pkts == 0
assert ssd.port[ports.y_num].sguid[101].tx_bytes == 0
"""
# TODO?: rx = tx
except RpcError as e:
raise