From cffada4c074717ab1703763d5f34fc5afaf11c82 Mon Sep 17 00:00:00 2001 From: Srivats P Date: Thu, 10 Nov 2016 18:18:27 +0530 Subject: [PATCH] sign: server rpc implementation of stream stats; actual collection of stream stats pending --- common/protocol.proto | 4 +-- server/abstractport.cpp | 56 +++++++++++++++++++++++++++++++++++++++++ server/abstractport.h | 16 ++++++++++++ server/myservice.cpp | 44 +++++++++++++++++++++++++++++--- test/streamstatstest.py | 7 +++--- 5 files changed, 119 insertions(+), 8 deletions(-) diff --git a/common/protocol.proto b/common/protocol.proto index 987e2f0..600ce56 100644 --- a/common/protocol.proto +++ b/common/protocol.proto @@ -272,8 +272,8 @@ message StreamGuid { } message StreamGuidList { - repeated StreamGuid stream_guid = 1; - optional PortIdList port_list = 2; + required PortIdList port_id_list = 1; + repeated StreamGuid stream_guid = 2; } message StreamStats { diff --git a/server/abstractport.cpp b/server/abstractport.cpp index 7762bf9..4acdfd4 100644 --- a/server/abstractport.cpp +++ b/server/abstractport.cpp @@ -54,6 +54,14 @@ AbstractPort::AbstractPort(int id, const char *device) maxStatsValue_ = ULLONG_MAX; // assume 64-bit stats memset((void*) &stats_, 0, sizeof(stats_)); resetStats(); + + // FIXME: temporary data for testing + { + StreamStatsTuple sst; + streamStats_.insert(1001, sst); + memset(&sst, 0, sizeof(sst)); + streamStats_.insert(1002, sst); + } } AbstractPort::~AbstractPort() @@ -633,6 +641,54 @@ void AbstractPort::stats(PortStats *stats) stats_.rxFrameErrors + (maxStatsValue_ - epochStats_.rxFrameErrors); } +void AbstractPort::streamStats(uint guid, OstProto::StreamStatsList *stats) +{ + if (streamStats_.contains(guid)) + { + StreamStatsTuple sst = streamStats_.value(guid); + OstProto::StreamStats *s = stats->add_stream_stats(); + + s->mutable_stream_guid()->set_id(guid); + s->mutable_port_id()->set_id(id()); + + s->set_tx_pkts(sst.tx_pkts); + s->set_tx_bytes(sst.tx_bytes); + s->set_rx_pkts(sst.rx_pkts); + s->set_rx_bytes(sst.rx_bytes); + } +} + +void AbstractPort::streamStatsAll(OstProto::StreamStatsList *stats) +{ + // FIXME: change input param to a non-OstProto type and/or have + // a getFirst/Next like API? + QHashIterator i(streamStats_); + while (i.hasNext()) + { + i.next(); + StreamStatsTuple sst = i.value(); + OstProto::StreamStats *s = stats->add_stream_stats(); + + s->mutable_stream_guid()->set_id(i.key()); + s->mutable_port_id()->set_id(id()); + + s->set_tx_pkts(sst.tx_pkts); + s->set_tx_bytes(sst.tx_bytes); + s->set_rx_pkts(sst.rx_pkts); + s->set_rx_bytes(sst.rx_bytes); + } +} + +void AbstractPort::resetStreamStats(uint guid) +{ + streamStats_.remove(guid); +} + +void AbstractPort::resetStreamStatsAll() +{ + streamStats_.clear(); +} + void AbstractPort::clearDeviceNeighbors() { deviceManager_->clearDeviceNeighbors(); diff --git a/server/abstractport.h b/server/abstractport.h index 7eb3996..390b941 100644 --- a/server/abstractport.h +++ b/server/abstractport.h @@ -20,6 +20,7 @@ along with this program. If not, see #ifndef _SERVER_ABSTRACT_PORT_H #define _SERVER_ABSTRACT_PORT_H +#include #include #include @@ -112,6 +113,12 @@ public: void stats(PortStats *stats); void resetStats() { epochStats_ = stats_; } + // FIXME: combine single and All calls? + void streamStats(uint guid, OstProto::StreamStatsList *stats); + void streamStatsAll(OstProto::StreamStatsList *stats); + void resetStreamStats(uint guid); + void resetStreamStatsAll(); + DeviceManager* deviceManager(); virtual void startDeviceEmulation() = 0; virtual void stopDeviceEmulation() = 0; @@ -124,6 +131,14 @@ public: quint64 neighborMacAddress(int streamId, int frameIndex); protected: + struct StreamStatsTuple + { + quint64 rx_pkts; + quint64 rx_bytes; + quint64 tx_pkts; + quint64 tx_bytes; + }; + void addNote(QString note); void updatePacketListSequential(); @@ -137,6 +152,7 @@ protected: quint64 maxStatsValue_; struct PortStats stats_; + QHash streamStats_; //! \todo Need lock for stats access/update DeviceManager *deviceManager_; diff --git a/server/myservice.cpp b/server/myservice.cpp index 927a715..6f3ff19 100644 --- a/server/myservice.cpp +++ b/server/myservice.cpp @@ -1,5 +1,5 @@ /* -Copyright (C) 2010-2015 Srivats P. +Copyright (C) 2010-2016 Srivats P. This file is part of "Ostinato" @@ -577,7 +577,25 @@ void MyService::getStreamStats( ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - // TODO + + for (int i = 0; i < request->port_id_list().port_id_size(); i++) + { + int portId; + + portId = request->port_id_list().port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo(LOW): partial rpc? + + portLock[portId]->lockForRead(); + if (request->stream_guid_size()) + for (int j = 0; j < request->stream_guid_size(); j++) + portInfo[portId]->streamStats(request->stream_guid(j).id(), + response); + else + portInfo[portId]->streamStatsAll(response); + portLock[portId]->unlock(); + } + done->Run(); } @@ -588,7 +606,27 @@ void MyService::clearStreamStats( ::google::protobuf::Closure* done) { qDebug("In %s", __PRETTY_FUNCTION__); - // TODO + + for (int i = 0; i < request->port_id_list().port_id_size(); i++) + { + int portId; + + portId = request->port_id_list().port_id(i).id(); + if ((portId < 0) || (portId >= portInfo.size())) + continue; //! \todo (LOW): partial RPC? + + portLock[portId]->lockForWrite(); + if (request->stream_guid_size()) + for (int j = 0; j < request->stream_guid_size(); j++) + portInfo[portId]->resetStreamStats( + request->stream_guid(j).id()); + else + portInfo[portId]->resetStreamStatsAll(); + portLock[portId]->unlock(); + } + + //! \todo (LOW): fill-in response "Ack"???? + done->Run(); } diff --git a/test/streamstatstest.py b/test/streamstatstest.py index 7998d53..d76cede 100644 --- a/test/streamstatstest.py +++ b/test/streamstatstest.py @@ -350,9 +350,9 @@ def stream(request, drone, ports): @pytest.fixture(scope='module') def stream_guids(request, drone, ports): stream_guids = ost_pb.StreamGuidList() + stream_guids.port_id_list.port_id.add().id = ports.x.port_id[0].id; + stream_guids.port_id_list.port_id.add().id = ports.y.port_id[0].id; stream_guids.stream_guid.add().id = 101 - stream_guids.port_list.port_id.add().id = ports.x.port_id[0].id; - stream_guids.port_list.port_id.add().id = ports.y.port_id[0].id; return stream_guids """ @@ -464,9 +464,10 @@ def test_unidir(drone, ports, dut, dut_ports, dut_ip, emul_ports, dgid_list, os.remove('capture.pcap') # verify stream stats + stream_guids.ClearField("stream_guid"); stream_stats_list = drone.getStreamStats(stream_guids) log.info('--> (stream_stats)' + stream_stats_list.__str__()) - assert (len(stream_stats_list.stream_stats) > 0) + assert len(stream_stats_list.stream_stats) == 2 # FIXME: verify stream stats