sign: server rpc implementation of stream stats; actual collection of stream stats pending

This commit is contained in:
Srivats P 2016-11-10 18:18:27 +05:30
parent 783db4e832
commit cffada4c07
5 changed files with 119 additions and 8 deletions

View File

@ -272,8 +272,8 @@ message StreamGuid {
}
message StreamGuidList {
repeated StreamGuid stream_guid = 1;
optional PortIdList port_list = 2;
required PortIdList port_id_list = 1;
repeated StreamGuid stream_guid = 2;
}
message StreamStats {

View File

@ -54,6 +54,14 @@ AbstractPort::AbstractPort(int id, const char *device)
maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats
memset((void*) &stats_, 0, sizeof(stats_));
resetStats();
// FIXME: temporary data for testing
{
StreamStatsTuple sst;
streamStats_.insert(1001, sst);
memset(&sst, 0, sizeof(sst));
streamStats_.insert(1002, sst);
}
}
AbstractPort::~AbstractPort()
@ -633,6 +641,54 @@ void AbstractPort::stats(PortStats *stats)
stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors);
}
void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats)
{
if (streamStats_.contains(guid))
{
StreamStatsTuple sst = streamStats_.value(guid);
OstProto::StreamStats *s = stats->add_stream_stats();
s->mutable_stream_guid()->set_id(guid);
s->mutable_port_id()->set_id(id());
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
s->set_rx_pkts(sst.rx_pkts);
s->set_rx_bytes(sst.rx_bytes);
}
}
void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats)
{
// FIXME: change input param to a non-OstProto type and/or have
// a getFirst/Next like API?
QHashIterator<uint, StreamStatsTuple> i(streamStats_);
while (i.hasNext())
{
i.next();
StreamStatsTuple sst = i.value();
OstProto::StreamStats *s = stats->add_stream_stats();
s->mutable_stream_guid()->set_id(i.key());
s->mutable_port_id()->set_id(id());
s->set_tx_pkts(sst.tx_pkts);
s->set_tx_bytes(sst.tx_bytes);
s->set_rx_pkts(sst.rx_pkts);
s->set_rx_bytes(sst.rx_bytes);
}
}
void AbstractPort::resetStreamStats(uint guid)
{
streamStats_.remove(guid);
}
void AbstractPort::resetStreamStatsAll()
{
streamStats_.clear();
}
void AbstractPort::clearDeviceNeighbors()
{
deviceManager_->clearDeviceNeighbors();

View File

@ -20,6 +20,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>
#ifndef _SERVER_ABSTRACT_PORT_H
#define _SERVER_ABSTRACT_PORT_H
#include <QHash>
#include <QList>
#include <QtGlobal>
@ -112,6 +113,12 @@ public:
void stats(PortStats *stats);
void resetStats() { epochStats_ = stats_; }
// FIXME: combine single and All calls?
void streamStats(uint guid, OstProto::StreamStatsList *stats);
void streamStatsAll(OstProto::StreamStatsList *stats);
void resetStreamStats(uint guid);
void resetStreamStatsAll();
DeviceManager* deviceManager();
virtual void startDeviceEmulation() = 0;
virtual void stopDeviceEmulation() = 0;
@ -124,6 +131,14 @@ public:
quint64 neighborMacAddress(int streamId, int frameIndex);
protected:
struct StreamStatsTuple
{
quint64 rx_pkts;
quint64 rx_bytes;
quint64 tx_pkts;
quint64 tx_bytes;
};
void addNote(QString note);
void updatePacketListSequential();
@ -137,6 +152,7 @@ protected:
quint64 maxStatsValue_;
struct PortStats stats_;
QHash<uint, StreamStatsTuple> streamStats_;
//! \todo Need lock for stats access/update
DeviceManager *deviceManager_;

View File

@ -1,5 +1,5 @@
/*
Copyright (C) 2010-2015 Srivats P.
Copyright (C) 2010-2016 Srivats P.
This file is part of "Ostinato"
@ -577,7 +577,25 @@ void MyService::getStreamStats(
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
// TODO
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
{
int portId;
portId = request->port_id_list().port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo(LOW): partial rpc?
portLock[portId]->lockForRead();
if (request->stream_guid_size())
for (int j = 0; j < request->stream_guid_size(); j++)
portInfo[portId]->streamStats(request->stream_guid(j).id(),
response);
else
portInfo[portId]->streamStatsAll(response);
portLock[portId]->unlock();
}
done->Run();
}
@ -588,7 +606,27 @@ void MyService::clearStreamStats(
::google::protobuf::Closure* done)
{
qDebug("In %s", __PRETTY_FUNCTION__);
// TODO
for (int i = 0; i < request->port_id_list().port_id_size(); i++)
{
int portId;
portId = request->port_id_list().port_id(i).id();
if ((portId < 0) || (portId >= portInfo.size()))
continue; //! \todo (LOW): partial RPC?
portLock[portId]->lockForWrite();
if (request->stream_guid_size())
for (int j = 0; j < request->stream_guid_size(); j++)
portInfo[portId]->resetStreamStats(
request->stream_guid(j).id());
else
portInfo[portId]->resetStreamStatsAll();
portLock[portId]->unlock();
}
//! \todo (LOW): fill-in response "Ack"????
done->Run();
}

View File

@ -350,9 +350,9 @@ def stream(request, drone, ports):
@pytest.fixture(scope='module')
def stream_guids(request, drone, ports):
stream_guids = ost_pb.StreamGuidList()
stream_guids.port_id_list.port_id.add().id = ports.x.port_id[0].id;
stream_guids.port_id_list.port_id.add().id = ports.y.port_id[0].id;
stream_guids.stream_guid.add().id = 101
stream_guids.port_list.port_id.add().id = ports.x.port_id[0].id;
stream_guids.port_list.port_id.add().id = ports.y.port_id[0].id;
return stream_guids
"""
@ -464,9 +464,10 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list,
os.remove('capture.pcap')
# verify stream stats
stream_guids.ClearField("stream_guid");
stream_stats_list = drone.getStreamStats(stream_guids)
log.info('--> (stream_stats)' + stream_stats_list.__str__())
assert (len(stream_stats_list.stream_stats) > 0)
assert len(stream_stats_list.stream_stats) == 2
# FIXME: verify stream stats